net: Support IPv6 #963
- NetworkAddress now contains IPAddress object which can be either IPv4 or IPv6 address. 128bits are used even for IPv4 addresses, however only 32bits are used when using/serializing IPv4 address. - ConnectPacket is updated to store IPv6 address. Backward compatible with old format since the first 32bits of IP address field is used for serialization of IPv4. - Mainly updates rest of the code to use IPAddress structure instead of plain uint32_t. - IPv6 address/pair ports should be represented as `[ip]:port` as per convention. This applies to both cluster files and command line arguments.
This commit is contained in:
parent
1a550712cb
commit
57832e625d
|
@ -2031,7 +2031,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
wait( makeInterruptable(waitForExcludedServers(db,addresses)) );
|
||||
|
||||
std::vector<ProcessData> workers = wait( makeInterruptable(getWorkers(db)) );
|
||||
std::map<uint32_t, std::set<uint16_t>> workerPorts;
|
||||
std::map<IPAddress, std::set<uint16_t>> workerPorts;
|
||||
for(auto addr : workers)
|
||||
workerPorts[addr.address.ip].insert(addr.address.port);
|
||||
|
||||
|
@ -2050,7 +2050,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
"excluded the correct machines or processes before removing them from the cluster:\n");
|
||||
for(auto addr : absentExclusions) {
|
||||
if(addr.port == 0)
|
||||
printf(" %s\n", toIPString(addr.ip).c_str());
|
||||
printf(" %s\n", addr.ip.toString().c_str());
|
||||
else
|
||||
printf(" %s\n", addr.toString().c_str());
|
||||
}
|
||||
|
|
|
@ -28,13 +28,21 @@
|
|||
|
||||
#include "fdbclient/CoordinationInterface.h"
|
||||
|
||||
uint32_t determinePublicIPAutomatically( ClusterConnectionString const& ccs ) {
|
||||
IPAddress determinePublicIPAutomatically(ClusterConnectionString const& ccs) {
|
||||
try {
|
||||
boost::asio::io_service ioService;
|
||||
boost::asio::ip::udp::socket socket(ioService);
|
||||
boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address_v4(ccs.coordinators()[0].ip), ccs.coordinators()[0].port);
|
||||
using namespace boost::asio;
|
||||
|
||||
io_service ioService;
|
||||
ip::udp::socket socket(ioService);
|
||||
|
||||
const auto& coordAddr = ccs.coordinators()[0];
|
||||
const auto boostIp = coordAddr.ip.isV6() ? ip::address(ip::address_v6(coordAddr.ip.toV6()))
|
||||
: ip::address(ip::address_v4(coordAddr.ip.toV4()));
|
||||
|
||||
ip::udp::endpoint endpoint(boostIp, coordAddr.port);
|
||||
socket.connect(endpoint);
|
||||
auto ip = socket.local_endpoint().address().to_v4().to_ulong();
|
||||
IPAddress ip = coordAddr.ip.isV6() ? IPAddress(socket.local_endpoint().address().to_v6().to_bytes())
|
||||
: IPAddress(socket.local_endpoint().address().to_v4().to_ulong());
|
||||
socket.close();
|
||||
|
||||
return ip;
|
||||
|
@ -43,4 +51,4 @@ uint32_t determinePublicIPAutomatically( ClusterConnectionString const& ccs ) {
|
|||
fprintf(stderr, "Error determining public address: %s\n", e.what());
|
||||
throw bind_failed();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -633,33 +633,21 @@ struct LogMessageVersion {
|
|||
};
|
||||
|
||||
struct AddressExclusion {
|
||||
uint32_t ip;
|
||||
IPAddress ip;
|
||||
int port;
|
||||
|
||||
AddressExclusion() : ip(0), port(0) {}
|
||||
explicit AddressExclusion( uint32_t ip ) : ip(ip), port(0) {}
|
||||
explicit AddressExclusion( uint32_t ip, int port ) : ip(ip), port(port) {}
|
||||
explicit AddressExclusion(const IPAddress& ip) : ip(ip), port(0) {}
|
||||
explicit AddressExclusion(const IPAddress& ip, int port) : ip(ip), port(port) {}
|
||||
|
||||
explicit AddressExclusion (std::string s) {
|
||||
int a,b,c,d,p,count=-1;
|
||||
if (sscanf(s.c_str(), "%d.%d.%d.%d:%d%n", &a,&b,&c,&d, &p, &count) == 5 && count == s.size()) {
|
||||
ip = (a<<24)+(b<<16)+(c<<8)+d;
|
||||
port = p;
|
||||
}
|
||||
else if (sscanf(s.c_str(), "%d.%d.%d.%d%n", &a,&b,&c,&d, &count) == 4 && count == s.size()) {
|
||||
ip = (a<<24)+(b<<16)+(c<<8)+d;
|
||||
port = 0;
|
||||
}
|
||||
else {
|
||||
throw connection_string_invalid();
|
||||
}
|
||||
bool operator<(AddressExclusion const& r) const {
|
||||
if (ip != r.ip) return ip < r.ip;
|
||||
return port < r.port;
|
||||
}
|
||||
|
||||
bool operator< (AddressExclusion const& r) const { if (ip != r.ip) return ip < r.ip; return port<r.port; }
|
||||
bool operator== (AddressExclusion const& r) const { return ip == r.ip && port == r.port; }
|
||||
bool operator==(AddressExclusion const& r) const { return ip == r.ip && port == r.port; }
|
||||
|
||||
bool isWholeMachine() const { return port == 0; }
|
||||
bool isValid() const { return ip != 0 || port != 0; }
|
||||
bool isValid() const { return ip.isValid() || port != 0; }
|
||||
|
||||
bool excludes( NetworkAddress const& addr ) const {
|
||||
if(isWholeMachine())
|
||||
|
@ -669,9 +657,9 @@ struct AddressExclusion {
|
|||
|
||||
// This is for debugging and IS NOT to be used for serialization to persistant state
|
||||
std::string toString() const {
|
||||
std::string as = format( "%d.%d.%d.%d", (ip>>24)&0xff, (ip>>16)&0xff, (ip>>8)&0xff, ip&0xff );
|
||||
if (!isWholeMachine())
|
||||
as += format(":%d", port);
|
||||
std::string as = format("%s", ip.toString().c_str());
|
||||
const char* formatPatt = ip.isV6() ? "[%s]:%d" : "%s:%d";
|
||||
if (!isWholeMachine()) return format(formatPatt, as.c_str(), port);
|
||||
return as;
|
||||
}
|
||||
|
||||
|
|
|
@ -1730,7 +1730,7 @@ TEST_CASE("/ManagementAPI/AutoQuorumChange/checkLocality") {
|
|||
data.locality.set(LiteralStringRef("rack"), StringRef(rack));
|
||||
data.locality.set(LiteralStringRef("zoneid"), StringRef(rack));
|
||||
data.locality.set(LiteralStringRef("machineid"), StringRef(machineId));
|
||||
data.address.ip = i;
|
||||
data.address.ip = IPAddress(i);
|
||||
|
||||
workers.push_back(data);
|
||||
}
|
||||
|
@ -1749,8 +1749,8 @@ TEST_CASE("/ManagementAPI/AutoQuorumChange/checkLocality") {
|
|||
LiteralStringRef("machineid")
|
||||
});
|
||||
for(auto worker = chosen.begin(); worker != chosen.end(); worker++) {
|
||||
ASSERT(worker->ip < workers.size());
|
||||
LocalityData data = workers[worker->ip].locality;
|
||||
ASSERT(worker->ip.toV4() < workers.size());
|
||||
LocalityData data = workers[worker->ip.toV4()].locality;
|
||||
for(auto field = fields.begin(); field != fields.end(); field++) {
|
||||
chosenValues[*field].insert(data.get(*field).get());
|
||||
}
|
||||
|
|
|
@ -214,6 +214,28 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/basic") {
|
|||
ASSERT( input == cs.toString() );
|
||||
}
|
||||
|
||||
{
|
||||
input = "0xxdeadbeef:100100100@[::1]:1234,[::1]:1235";
|
||||
std::string commented("#start of comment\n");
|
||||
commented += input;
|
||||
commented += "\n";
|
||||
commented += "# asdfasdf ##";
|
||||
|
||||
ClusterConnectionString cs(commented);
|
||||
ASSERT(input == cs.toString());
|
||||
}
|
||||
|
||||
{
|
||||
input = "0xxdeadbeef:100100100@[abcd:dcba::1]:1234,[abcd:dcba::abcd:1]:1234";
|
||||
std::string commented("#start of comment\n");
|
||||
commented += input;
|
||||
commented += "\n";
|
||||
commented += "# asdfasdf ##";
|
||||
|
||||
ClusterConnectionString cs(commented);
|
||||
ASSERT(input == cs.toString());
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
|
|
@ -749,7 +749,7 @@ Database Database::createDatabase( std::string connFileName, int apiVersion, Loc
|
|||
return Database::createDatabase(rccf, apiVersion, clientLocality);
|
||||
}
|
||||
|
||||
extern uint32_t determinePublicIPAutomatically( ClusterConnectionString const& ccs );
|
||||
extern IPAddress determinePublicIPAutomatically(ClusterConnectionString const& ccs);
|
||||
|
||||
Cluster::Cluster( Reference<ClusterConnectionFile> connFile, int apiVersion )
|
||||
: clusterInterface(new AsyncVar<Optional<ClusterInterface>>())
|
||||
|
@ -791,7 +791,7 @@ void Cluster::init( Reference<ClusterConnectionFile> connFile, bool startClientI
|
|||
.detailf("ImageOffset", "%p", platform::getImageOffset())
|
||||
.trackLatest("ClientStart");
|
||||
|
||||
initializeSystemMonitorMachineState(SystemMonitorMachineState(publicIP));
|
||||
initializeSystemMonitorMachineState(SystemMonitorMachineState(IPAddress(publicIP)));
|
||||
|
||||
systemMonitor();
|
||||
uncancellable( recurring( &systemMonitor, CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, TaskFlushTrace ) );
|
||||
|
@ -1066,24 +1066,15 @@ bool GetRangeLimits::hasSatisfiedMinRows() {
|
|||
return hasByteLimit() && minRows == 0;
|
||||
}
|
||||
|
||||
|
||||
AddressExclusion AddressExclusion::parse( StringRef const& key ) {
|
||||
//Must not change: serialized to the database!
|
||||
std::string s = key.toString();
|
||||
int a,b,c,d,port,count=-1;
|
||||
if (sscanf(s.c_str(), "%d.%d.%d.%d%n", &a,&b,&c,&d, &count)<4) {
|
||||
try {
|
||||
auto addr = NetworkAddress::parse(key.toString());
|
||||
return AddressExclusion(addr.ip, addr.port);
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarnAlways, "AddressExclusionParseError").detail("String", printable(key));
|
||||
return AddressExclusion();
|
||||
}
|
||||
s = s.substr(count);
|
||||
uint32_t ip = (a<<24)+(b<<16)+(c<<8)+d;
|
||||
if (!s.size())
|
||||
return AddressExclusion( ip );
|
||||
if (sscanf( s.c_str(), ":%d%n", &port, &count ) < 1 || count != s.size()) {
|
||||
TraceEvent(SevWarnAlways, "AddressExclusionParseError").detail("String", printable(key));
|
||||
return AddressExclusion();
|
||||
}
|
||||
return AddressExclusion( ip, port );
|
||||
}
|
||||
|
||||
Future<Standalone<RangeResultRef>> getRange(
|
||||
|
@ -2038,7 +2029,7 @@ ACTOR Future< Standalone< VectorRef< const char*>>> getAddressesForKeyActor( Key
|
|||
|
||||
Standalone<VectorRef<const char*>> addresses;
|
||||
for (auto i : ssi) {
|
||||
std::string ipString = toIPString(i.address().ip);
|
||||
std::string ipString = i.address().ip.toString();
|
||||
char* c_string = new (addresses.arena()) char[ipString.length()+1];
|
||||
strcpy(c_string, ipString.c_str());
|
||||
addresses.push_back(addresses.arena(), c_string);
|
||||
|
|
|
@ -374,7 +374,7 @@ const AddressExclusion decodeExcludedServersKey( KeyRef const& key ) {
|
|||
}
|
||||
std::string encodeExcludedServersKey( AddressExclusion const& addr ) {
|
||||
//FIXME: make sure what's persisted here is not affected by innocent changes elsewhere
|
||||
std::string as = format( "%d.%d.%d.%d", (addr.ip>>24)&0xff, (addr.ip>>16)&0xff, (addr.ip>>8)&0xff, addr.ip&0xff );
|
||||
std::string as = format("%s", addr.ip.toString().c_str());
|
||||
//ASSERT( StringRef(as).endsWith(LiteralStringRef(":0")) == (addr.port == 0) );
|
||||
if (!addr.isWholeMachine())
|
||||
as += format(":%d", addr.port);
|
||||
|
|
|
@ -194,10 +194,7 @@ public:
|
|||
};
|
||||
|
||||
#define CONNECT_PACKET_V0 0x0FDB00A444020001LL
|
||||
#define CONNECT_PACKET_V1 0x0FDB00A446030001LL
|
||||
#define CONNECT_PACKET_V0_SIZE 14
|
||||
#define CONNECT_PACKET_V1_SIZE 22
|
||||
#define CONNECT_PACKET_V2_SIZE 26
|
||||
|
||||
#pragma pack( push, 1 )
|
||||
struct ConnectPacket {
|
||||
|
@ -205,16 +202,44 @@ struct ConnectPacket {
|
|||
uint64_t protocolVersion; // Expect currentProtocolVersion
|
||||
uint16_t canonicalRemotePort; // Port number to reconnect to the originating process
|
||||
uint64_t connectionId; // Multi-version clients will use the same Id for both connections, other connections will set this to zero. Added at protocol Version 0x0FDB00A444020001.
|
||||
uint32_t canonicalRemoteIp; // IP Address to reconnect to the originating process
|
||||
union {
|
||||
uint32_t v4;
|
||||
uint8_t v6[16];
|
||||
} canonicalRemoteIp46; // IP Address to reconnect to the originating process
|
||||
|
||||
size_t minimumSize() {
|
||||
if (protocolVersion < CONNECT_PACKET_V0) return CONNECT_PACKET_V0_SIZE;
|
||||
if (protocolVersion < CONNECT_PACKET_V1) return CONNECT_PACKET_V1_SIZE;
|
||||
return CONNECT_PACKET_V2_SIZE;
|
||||
IPAddress canonicalRemoteIp() const {
|
||||
if (isIPv6()) {
|
||||
IPAddress::IPAddressStore ip;
|
||||
memcpy(ip.data(), &canonicalRemoteIp46.v6, ip.size());
|
||||
return IPAddress(ip);
|
||||
} else {
|
||||
return IPAddress(canonicalRemoteIp46.v4);
|
||||
}
|
||||
}
|
||||
|
||||
void setCanonicalRemoteIp(const IPAddress& ip) {
|
||||
if (ip.isV6()) {
|
||||
memcpy(&canonicalRemoteIp46.v6, ip.toV6().data(), 16);
|
||||
} else {
|
||||
canonicalRemoteIp46.v4 = ip.toV4();
|
||||
}
|
||||
}
|
||||
|
||||
bool isIPv6() const { return connectPacketLength == (sizeof(ConnectPacket) - sizeof(connectPacketLength)); }
|
||||
|
||||
uint32_t totalPacketSize() const { return connectPacketLength + sizeof(connectPacketLength); }
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, connectPacketLength, protocolVersion, canonicalRemotePort, connectionId);
|
||||
if (isIPv6()) {
|
||||
ar.serializeBytes(&canonicalRemoteIp46.v6, sizeof(canonicalRemoteIp46.v6));
|
||||
} else {
|
||||
serializer(ar, canonicalRemoteIp46.v4);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
static_assert( sizeof(ConnectPacket) == CONNECT_PACKET_V2_SIZE, "ConnectPacket packed incorrectly" );
|
||||
#pragma pack( pop )
|
||||
|
||||
ACTOR static Future<Void> connectionReader(TransportData* transport, Reference<IConnection> conn, Peer* peer,
|
||||
|
@ -256,23 +281,24 @@ struct Peer : NonCopyable {
|
|||
for(auto& addr : transport->localAddresses) {
|
||||
if(addr.isTLS() == destination.isTLS()) {
|
||||
pkt.canonicalRemotePort = addr.port;
|
||||
pkt.canonicalRemoteIp = addr.ip;
|
||||
pkt.setCanonicalRemoteIp(addr.ip);
|
||||
pkt.connectPacketLength = sizeof(pkt) - sizeof(pkt.connectPacketLength) - (addr.isV6() ? 0 : 12);
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
pkt.canonicalRemotePort = 0; // a "mixed" TLS/non-TLS connection is like a client/server connection - there's no way to reverse it
|
||||
pkt.canonicalRemoteIp = 0;
|
||||
pkt.setCanonicalRemoteIp(IPAddress(0));
|
||||
pkt.connectPacketLength = sizeof(pkt) - sizeof(pkt.connectPacketLength);
|
||||
}
|
||||
|
||||
pkt.connectPacketLength = sizeof(pkt)-sizeof(pkt.connectPacketLength);
|
||||
pkt.protocolVersion = currentProtocolVersion;
|
||||
pkt.connectionId = transport->transportId;
|
||||
|
||||
PacketBuffer* pb_first = new PacketBuffer;
|
||||
PacketWriter wr( pb_first, NULL, Unversioned() );
|
||||
wr.serializeBinaryItem(pkt);
|
||||
pkt.serialize(wr);
|
||||
unsent.prependWriteBuffer(pb_first, wr.finish());
|
||||
}
|
||||
|
||||
|
@ -647,7 +673,7 @@ ACTOR static Future<Void> connectionReader(
|
|||
ConnectPacket* p = (ConnectPacket*)unprocessed_begin;
|
||||
|
||||
uint64_t connectionId = 0;
|
||||
int32_t connectPacketSize = p->minimumSize();
|
||||
int32_t connectPacketSize = p->totalPacketSize();
|
||||
if ( unprocessed_end-unprocessed_begin >= connectPacketSize ) {
|
||||
if(p->protocolVersion >= 0x0FDB00A444020001) {
|
||||
connectionId = p->connectionId;
|
||||
|
@ -655,18 +681,22 @@ ACTOR static Future<Void> connectionReader(
|
|||
|
||||
if( (p->protocolVersion & compatibleProtocolVersionMask) != (currentProtocolVersion & compatibleProtocolVersionMask) ) {
|
||||
incompatibleProtocolVersionNewer = p->protocolVersion > currentProtocolVersion;
|
||||
NetworkAddress addr = p->canonicalRemotePort ? NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) : conn->getPeerAddress();
|
||||
NetworkAddress addr = p->canonicalRemotePort
|
||||
? NetworkAddress(p->canonicalRemoteIp(), p->canonicalRemotePort)
|
||||
: conn->getPeerAddress();
|
||||
if(connectionId != 1) addr.port = 0;
|
||||
|
||||
if(!transport->multiVersionConnections.count(connectionId)) {
|
||||
if(now() - transport->lastIncompatibleMessage > FLOW_KNOBS->CONNECTION_REJECTED_MESSAGE_DELAY) {
|
||||
TraceEvent(SevWarn, "ConnectionRejected", conn->getDebugID())
|
||||
.detail("Reason", "IncompatibleProtocolVersion")
|
||||
.detail("LocalVersion", currentProtocolVersion)
|
||||
.detail("RejectedVersion", p->protocolVersion)
|
||||
.detail("VersionMask", compatibleProtocolVersionMask)
|
||||
.detail("Peer", p->canonicalRemotePort ? NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) : conn->getPeerAddress())
|
||||
.detail("ConnectionId", connectionId);
|
||||
.detail("Reason", "IncompatibleProtocolVersion")
|
||||
.detail("LocalVersion", currentProtocolVersion)
|
||||
.detail("RejectedVersion", p->protocolVersion)
|
||||
.detail("VersionMask", compatibleProtocolVersionMask)
|
||||
.detail("Peer", p->canonicalRemotePort ? NetworkAddress(p->canonicalRemoteIp(),
|
||||
p->canonicalRemotePort)
|
||||
: conn->getPeerAddress())
|
||||
.detail("ConnectionId", connectionId);
|
||||
transport->lastIncompatibleMessage = now();
|
||||
}
|
||||
if(!transport->incompatiblePeers.count(addr)) {
|
||||
|
@ -699,7 +729,9 @@ ACTOR static Future<Void> connectionReader(
|
|||
peerProtocolVersion = p->protocolVersion;
|
||||
if (peer != nullptr) {
|
||||
// Outgoing connection; port information should be what we expect
|
||||
TraceEvent("ConnectedOutgoing").suppressFor(1.0).detail("PeerAddr", NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) );
|
||||
TraceEvent("ConnectedOutgoing")
|
||||
.suppressFor(1.0)
|
||||
.detail("PeerAddr", NetworkAddress(p->canonicalRemoteIp(), p->canonicalRemotePort));
|
||||
peer->compatible = compatible;
|
||||
peer->incompatibleProtocolVersionNewer = incompatibleProtocolVersionNewer;
|
||||
if (!compatible) {
|
||||
|
@ -709,7 +741,8 @@ ACTOR static Future<Void> connectionReader(
|
|||
ASSERT( p->canonicalRemotePort == peerAddress.port );
|
||||
} else {
|
||||
if (p->canonicalRemotePort) {
|
||||
peerAddress = NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort, true, peerAddress.isTLS() );
|
||||
peerAddress = NetworkAddress(p->canonicalRemoteIp(), p->canonicalRemotePort, true,
|
||||
peerAddress.isTLS());
|
||||
}
|
||||
peer = transport->getPeer(peerAddress);
|
||||
peer->compatible = compatible;
|
||||
|
|
|
@ -177,7 +177,7 @@ Future<Reference<IConnection>> TLSNetworkConnections::connect( NetworkAddress to
|
|||
// addresses against certificates, so we have our own peer verifying logic
|
||||
// to use. For FDB<->external system connections, we can use the standard
|
||||
// hostname-based certificate verification logic.
|
||||
if (host.empty() || host == toIPString(toAddr.ip))
|
||||
if (host.empty() || host == toAddr.ip.toString())
|
||||
return wrap(options->get_policy(TLSOptions::POLICY_VERIFY_PEERS), true, network->connect(clearAddr), std::string(""));
|
||||
else
|
||||
return wrap( options->get_policy(TLSOptions::POLICY_NO_VERIFY_PEERS), true, network->connect( clearAddr ), host );
|
||||
|
|
|
@ -135,28 +135,29 @@ struct SimClogging {
|
|||
return t - tnow;
|
||||
}
|
||||
|
||||
void clogPairFor( uint32_t from, uint32_t to, double t ) {
|
||||
void clogPairFor(const IPAddress& from, const IPAddress& to, double t) {
|
||||
auto& u = clogPairUntil[ std::make_pair( from, to ) ];
|
||||
u = std::max(u, now() + t);
|
||||
}
|
||||
void clogSendFor( uint32_t from, double t ) {
|
||||
void clogSendFor(const IPAddress& from, double t) {
|
||||
auto& u = clogSendUntil[from];
|
||||
u = std::max(u, now() + t);
|
||||
}
|
||||
void clogRecvFor( uint32_t from, double t ) {
|
||||
void clogRecvFor(const IPAddress& from, double t) {
|
||||
auto& u = clogRecvUntil[from];
|
||||
u = std::max(u, now() + t);
|
||||
}
|
||||
double setPairLatencyIfNotSet( uint32_t from, uint32_t to, double t ) {
|
||||
double setPairLatencyIfNotSet(const IPAddress& from, const IPAddress& to, double t) {
|
||||
auto i = clogPairLatency.find( std::make_pair(from,to) );
|
||||
if (i == clogPairLatency.end())
|
||||
i = clogPairLatency.insert( std::make_pair( std::make_pair(from,to), t ) ).first;
|
||||
return i->second;
|
||||
}
|
||||
|
||||
private:
|
||||
std::map< uint32_t, double > clogSendUntil, clogRecvUntil;
|
||||
std::map< std::pair<uint32_t, uint32_t>, double > clogPairUntil;
|
||||
std::map< std::pair<uint32_t, uint32_t>, double > clogPairLatency;
|
||||
std::map<IPAddress, double> clogSendUntil, clogRecvUntil;
|
||||
std::map<std::pair<IPAddress, IPAddress>, double> clogPairUntil;
|
||||
std::map<std::pair<IPAddress, IPAddress>, double> clogPairLatency;
|
||||
double halfLatency() {
|
||||
double a = g_random->random01();
|
||||
const double pFast = 0.999;
|
||||
|
@ -789,9 +790,10 @@ public:
|
|||
Reference<Sim2Conn> myc( new Sim2Conn( getCurrentProcess() ) );
|
||||
Reference<Sim2Conn> peerc( new Sim2Conn( peerp ) );
|
||||
|
||||
// TODO Support IPv6
|
||||
myc->connect(peerc, toAddr);
|
||||
peerc->connect(myc, NetworkAddress( getCurrentProcess()->address.ip + g_random->randomInt(0,256),
|
||||
g_random->randomInt(40000, 60000) ));
|
||||
IPAddress localIp(getCurrentProcess()->address.ip.toV4() + g_random->randomInt(0, 256));
|
||||
peerc->connect(myc, NetworkAddress(localIp, g_random->randomInt(40000, 60000)));
|
||||
|
||||
((Sim2Listener*)peerp->getListener(toAddr).getPtr())->incomingConnection( 0.5*g_random->random01(), Reference<IConnection>(peerc) );
|
||||
return onConnect( ::delay(0.5*g_random->random01()), myc );
|
||||
|
@ -1499,22 +1501,24 @@ public:
|
|||
|
||||
return (kt == ktMin);
|
||||
}
|
||||
virtual void clogInterface( uint32_t ip, double seconds, ClogMode mode = ClogDefault ) {
|
||||
virtual void clogInterface(const IPAddress& ip, double seconds, ClogMode mode = ClogDefault) {
|
||||
if (mode == ClogDefault) {
|
||||
double a = g_random->random01();
|
||||
if ( a < 0.3 ) mode = ClogSend;
|
||||
else if (a < 0.6 ) mode = ClogReceive;
|
||||
else mode = ClogAll;
|
||||
}
|
||||
TraceEvent("ClogInterface").detail("IP", toIPString(ip)).detail("Delay", seconds)
|
||||
.detail("Queue", mode==ClogSend?"Send":mode==ClogReceive?"Receive":"All");
|
||||
TraceEvent("ClogInterface")
|
||||
.detail("IP", ip.toString())
|
||||
.detail("Delay", seconds)
|
||||
.detail("Queue", mode == ClogSend ? "Send" : mode == ClogReceive ? "Receive" : "All");
|
||||
|
||||
if (mode == ClogSend || mode==ClogAll)
|
||||
g_clogging.clogSendFor( ip, seconds );
|
||||
if (mode == ClogReceive || mode==ClogAll)
|
||||
g_clogging.clogRecvFor( ip, seconds );
|
||||
}
|
||||
virtual void clogPair( uint32_t from, uint32_t to, double seconds ) {
|
||||
virtual void clogPair(const IPAddress& from, const IPAddress& to, double seconds) {
|
||||
g_clogging.clogPairFor( from, to, seconds );
|
||||
}
|
||||
virtual std::vector<ProcessInfo*> getAllProcesses() const {
|
||||
|
@ -1653,7 +1657,7 @@ public:
|
|||
INetwork *net2;
|
||||
|
||||
//Map from machine IP -> machine disk space info
|
||||
std::map<uint32_t, SimDiskSpace> diskSpaceMap;
|
||||
std::map<IPAddress, SimDiskSpace> diskSpaceMap;
|
||||
|
||||
//Whether or not yield has returned true during the current iteration of the run loop
|
||||
bool yielded;
|
||||
|
|
|
@ -114,8 +114,12 @@ public:
|
|||
|
||||
std::string toString() const {
|
||||
const NetworkAddress& address = addresses[0];
|
||||
return format("name: %s address: %d.%d.%d.%d:%d zone: %s datahall: %s class: %s excluded: %d cleared: %d",
|
||||
name, (address.ip>>24)&0xff, (address.ip>>16)&0xff, (address.ip>>8)&0xff, address.ip&0xff, address.port, (locality.zoneId().present() ? locality.zoneId().get().printable().c_str() : "[unset]"), (locality.dataHallId().present() ? locality.dataHallId().get().printable().c_str() : "[unset]"), startingClass.toString().c_str(), excluded, cleared);
|
||||
return format(
|
||||
"name: %s address: %s:%d zone: %s datahall: %s class: %s excluded: %d cleared: %d", name,
|
||||
address.ip.toString().c_str(), address.port,
|
||||
(locality.zoneId().present() ? locality.zoneId().get().printable().c_str() : "[unset]"),
|
||||
(locality.dataHallId().present() ? locality.dataHallId().get().printable().c_str() : "[unset]"),
|
||||
startingClass.toString().c_str(), excluded, cleared);
|
||||
}
|
||||
|
||||
// Members not for external use
|
||||
|
@ -256,8 +260,8 @@ public:
|
|||
allSwapsDisabled = true;
|
||||
}
|
||||
|
||||
virtual void clogInterface( uint32_t ip, double seconds, ClogMode mode = ClogDefault ) = 0;
|
||||
virtual void clogPair( uint32_t from, uint32_t to, double seconds ) = 0;
|
||||
virtual void clogInterface(const IPAddress& ip, double seconds, ClogMode mode = ClogDefault) = 0;
|
||||
virtual void clogPair(const IPAddress& from, const IPAddress& to, double seconds) = 0;
|
||||
virtual std::vector<ProcessInfo*> getAllProcesses() const = 0;
|
||||
virtual ProcessInfo* getProcessByAddress( NetworkAddress const& address ) = 0;
|
||||
virtual MachineInfo* getMachineByNetworkAddress(NetworkAddress const& address) = 0;
|
||||
|
|
|
@ -289,7 +289,7 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, vector<std:
|
|||
const TraceEventFields& event = it->second;
|
||||
|
||||
try {
|
||||
std::string address = toIPString(it->first.ip);
|
||||
std::string address = it->first.ip.toString();
|
||||
// We will use the "physical" caluculated machine ID here to limit exposure to machineID repurposing
|
||||
std::string machineId = event.getValue("MachineID");
|
||||
|
||||
|
@ -1254,9 +1254,15 @@ namespace std
|
|||
{
|
||||
size_t operator()(const NetworkAddress& na) const
|
||||
{
|
||||
return (na.ip << 16) + na.port;
|
||||
}
|
||||
};
|
||||
int result = 0;
|
||||
if (na.ip.isV6()) {
|
||||
result = hashlittle(na.ip.toV6().data(), 16, 0);
|
||||
} else {
|
||||
result = na.ip.toV4();
|
||||
}
|
||||
return (result << 16) + na.port;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
ACTOR template <class iface>
|
||||
|
@ -1667,7 +1673,7 @@ static JsonBuilderArray getClientIssuesAsMessages( ProcessIssuesMap const& _issu
|
|||
std::map<std::string, std::vector<std::string>> deduplicatedIssues;
|
||||
|
||||
for(auto i : issues) {
|
||||
deduplicatedIssues[i.second.first].push_back(format("%s:%d", toIPString(i.first.ip).c_str(), i.first.port));
|
||||
deduplicatedIssues[i.second.first].push_back(format("%s:%d", i.first.ip.toString().c_str(), i.first.port));
|
||||
}
|
||||
|
||||
for (auto i : deduplicatedIssues) {
|
||||
|
|
|
@ -170,7 +170,7 @@ extern void copyTest();
|
|||
extern void versionedMapTest();
|
||||
extern void createTemplateDatabase();
|
||||
// FIXME: this really belongs in a header somewhere since it is actually used.
|
||||
extern uint32_t determinePublicIPAutomatically( ClusterConnectionString const& ccs );
|
||||
extern IPAddress determinePublicIPAutomatically(ClusterConnectionString const& ccs);
|
||||
|
||||
extern const char* getHGVersion();
|
||||
|
||||
|
@ -776,7 +776,7 @@ std::pair<NetworkAddressList, NetworkAddressList> buildNetworkAddresses(const Cl
|
|||
if (autoPublicAddress) {
|
||||
try {
|
||||
const NetworkAddress& parsedAddress = NetworkAddress::parse("0.0.0.0:" + publicAddressStr.substr(5));
|
||||
uint32_t publicIP = determinePublicIPAutomatically(connectionFile.getConnectionString());
|
||||
const IPAddress publicIP = determinePublicIPAutomatically(connectionFile.getConnectionString());
|
||||
publicNetworkAddresses.emplace_back(publicIP, parsedAddress.port, true, parsedAddress.isTLS());
|
||||
} catch (Error& e) {
|
||||
fprintf(stderr, "ERROR: could not determine public address automatically from `%s': %s\n", publicAddressStr.c_str(), e.what());
|
||||
|
|
|
@ -90,7 +90,8 @@ struct CpuProfilerWorkload : TestWorkload
|
|||
req.duration = 0; //unused
|
||||
|
||||
//The profiler output name will be the ip.port.prof
|
||||
req.outputFile = StringRef(toIPString(self->profilingWorkers[i].address().ip) + "." + format("%d", self->profilingWorkers[i].address().port) + ".profile.bin");
|
||||
req.outputFile = StringRef(self->profilingWorkers[i].address().ip.toString() + "." +
|
||||
format("%d", self->profilingWorkers[i].address().port) + ".profile.bin");
|
||||
|
||||
replies.push_back(self->profilingWorkers[i].clientInterface.profiler.tryGetReply(req));
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
|
||||
std::map<Optional<Standalone<StringRef>>, AddressExclusion> machinesMap; // Locality Zone Id -> ip address
|
||||
std::vector<AddressExclusion> processAddrs; // IF (killProcesses) THEN ip:port ELSE ip addresses unique list of the machines
|
||||
std::map<uint32_t, Optional<Standalone<StringRef>>> ip_dcid;
|
||||
std::map<IPAddress, Optional<Standalone<StringRef>>> ip_dcid;
|
||||
auto processes = getServers();
|
||||
for(auto& it : processes) {
|
||||
AddressExclusion machineIp(it->address.ip);
|
||||
|
|
|
@ -91,13 +91,15 @@ struct SaveAndKillWorkload : TestWorkload {
|
|||
ini.SetValue(machineIdString, "dcUID", (process->locality.dcId().present()) ? process->locality.dcId().get().printable().c_str() : "");
|
||||
ini.SetValue(machineIdString, "zoneId", (process->locality.zoneId().present()) ? process->locality.zoneId().get().printable().c_str() : "");
|
||||
ini.SetValue(machineIdString, "mClass", format("%d", process->startingClass.classType()).c_str());
|
||||
ini.SetValue(machineIdString, format("ipAddr%d", process->address.port-1).c_str(), format("%d", process->address.ip).c_str());
|
||||
ini.SetValue(machineIdString, format("ipAddr%d", process->address.port - 1).c_str(),
|
||||
format("%d", process->address.ip.toV4()).c_str());
|
||||
ini.SetValue(machineIdString, format("%d", process->address.port-1).c_str(), process->dataFolder);
|
||||
ini.SetValue(machineIdString, format("c%d", process->address.port-1).c_str(), process->coordinationFolder);
|
||||
j++;
|
||||
}
|
||||
else {
|
||||
ini.SetValue(machineIdString, format("ipAddr%d", process->address.port-1).c_str(), format("%d", process->address.ip).c_str());
|
||||
ini.SetValue(machineIdString, format("ipAddr%d", process->address.port - 1).c_str(),
|
||||
format("%d", process->address.ip.toV4()).c_str());
|
||||
int oldValue = machines.find(machineId)->second;
|
||||
ini.SetValue(machineIdString, format("%d", process->address.port-1).c_str(), process->dataFolder);
|
||||
ini.SetValue(machineIdString, format("c%d", process->address.port-1).c_str(), process->coordinationFolder);
|
||||
|
|
|
@ -172,7 +172,7 @@ public:
|
|||
TDMetricCollection tdmetrics;
|
||||
double currentTime;
|
||||
bool stopped;
|
||||
std::map< uint32_t, bool > addressOnHostCache;
|
||||
std::map<IPAddress, bool> addressOnHostCache;
|
||||
|
||||
uint64_t numYields;
|
||||
|
||||
|
@ -226,8 +226,16 @@ public:
|
|||
std::vector<std::string> blobCredentialFiles;
|
||||
};
|
||||
|
||||
static boost::asio::ip::address tcpAddress(IPAddress const& n) {
|
||||
if (n.isV6()) {
|
||||
return boost::asio::ip::address_v6(n.toV6());
|
||||
} else {
|
||||
return boost::asio::ip::address_v4(n.toV4());
|
||||
}
|
||||
}
|
||||
|
||||
static tcp::endpoint tcpEndpoint( NetworkAddress const& n ) {
|
||||
return tcp::endpoint( boost::asio::ip::address_v4( n.ip ), n.port );
|
||||
return tcp::endpoint(tcpAddress(n.ip), n.port);
|
||||
}
|
||||
|
||||
class BindPromise {
|
||||
|
@ -458,7 +466,9 @@ private:
|
|||
auto f = p.getFuture();
|
||||
self->acceptor.async_accept( conn->getSocket(), peer_endpoint, std::move(p) );
|
||||
wait( f );
|
||||
conn->accept( NetworkAddress(peer_endpoint.address().to_v4().to_ulong(), peer_endpoint.port()) );
|
||||
auto peer_address = peer_endpoint.address().is_v6() ? IPAddress(peer_endpoint.address().to_v6().to_bytes())
|
||||
: IPAddress(peer_endpoint.address().to_v4().to_ulong());
|
||||
conn->accept(NetworkAddress(peer_address, peer_endpoint.port()));
|
||||
|
||||
return conn;
|
||||
} catch (...) {
|
||||
|
@ -850,13 +860,14 @@ ACTOR static Future<std::vector<NetworkAddress>> resolveTCPEndpoint_impl( Net2 *
|
|||
}
|
||||
|
||||
std::vector<NetworkAddress> addrs;
|
||||
|
||||
|
||||
tcp::resolver::iterator end;
|
||||
while(iter != end) {
|
||||
auto endpoint = iter->endpoint();
|
||||
// Currently only ipv4 is supported by NetworkAddress
|
||||
auto addr = endpoint.address();
|
||||
if(addr.is_v4()) {
|
||||
if (addr.is_v6()) {
|
||||
addrs.push_back(NetworkAddress(IPAddress(addr.to_v6().to_bytes()), endpoint.port()));
|
||||
} else {
|
||||
addrs.push_back(NetworkAddress(addr.to_v4().to_ulong(), endpoint.port()));
|
||||
}
|
||||
++iter;
|
||||
|
@ -890,9 +901,10 @@ bool Net2::isAddressOnThisHost( NetworkAddress const& addr ) {
|
|||
try {
|
||||
boost::asio::io_service ioService;
|
||||
boost::asio::ip::udp::socket socket(ioService);
|
||||
boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address_v4(addr.ip), 1);
|
||||
boost::asio::ip::udp::endpoint endpoint(tcpAddress(addr.ip), 1);
|
||||
socket.connect(endpoint);
|
||||
bool local = socket.local_endpoint().address().to_v4().to_ulong() == addr.ip;
|
||||
bool local = addr.ip.isV6() ? socket.local_endpoint().address().to_v6().to_bytes() == addr.ip.toV6()
|
||||
: socket.local_endpoint().address().to_v4().to_ulong() == addr.ip.toV4();
|
||||
socket.close();
|
||||
if (local) TraceEvent(SevInfo, "AddressIsOnHost").detail("Address", addr);
|
||||
return addressOnHostCache[ addr.ip ] = local;
|
||||
|
|
|
@ -499,7 +499,7 @@ void getDiskBytes(std::string const& directory, int64_t& free, int64_t& total) {
|
|||
}
|
||||
|
||||
#ifdef __unixish__
|
||||
const char* getInterfaceName(uint32_t _ip) {
|
||||
const char* getInterfaceName(const IPAddress& _ip) {
|
||||
INJECT_FAULT( platform_error, "getInterfaceName" );
|
||||
static char iname[20];
|
||||
|
||||
|
@ -514,9 +514,15 @@ const char* getInterfaceName(uint32_t _ip) {
|
|||
for (struct ifaddrs* iter = interfaces; iter; iter = iter->ifa_next) {
|
||||
if(!iter->ifa_addr)
|
||||
continue;
|
||||
if (iter->ifa_addr->sa_family == AF_INET) {
|
||||
if (iter->ifa_addr->sa_family == AF_INET && _ip.isV4()) {
|
||||
uint32_t ip = ntohl(((struct sockaddr_in*)iter->ifa_addr)->sin_addr.s_addr);
|
||||
if (ip == _ip) {
|
||||
if (ip == _ip.toV4()) {
|
||||
ifa_name = iter->ifa_name;
|
||||
break;
|
||||
}
|
||||
} else if (iter->ifa_addr->sa_family == AF_INET6 && _ip.isV6()) {
|
||||
struct sockaddr_in6* ifa_addr = (struct sockaddr_in6*)iter->ifa_addr;
|
||||
if (memcmp(_ip.toV6().data(), &ifa_addr->sin6_addr, 16) == 0) {
|
||||
ifa_name = iter->ifa_name;
|
||||
break;
|
||||
}
|
||||
|
@ -538,8 +544,8 @@ const char* getInterfaceName(uint32_t _ip) {
|
|||
#endif
|
||||
|
||||
#if defined(__linux__)
|
||||
void getNetworkTraffic(uint32_t ip, uint64_t& bytesSent, uint64_t& bytesReceived,
|
||||
uint64_t& outSegs, uint64_t& retransSegs) {
|
||||
void getNetworkTraffic(const IPAddress& ip, uint64_t& bytesSent, uint64_t& bytesReceived, uint64_t& outSegs,
|
||||
uint64_t& retransSegs) {
|
||||
INJECT_FAULT( platform_error, "getNetworkTraffic" ); // Even though this function doesn't throw errors, the equivalents for other platforms do, and since all of our simulation testing is on Linux...
|
||||
const char* ifa_name = nullptr;
|
||||
try {
|
||||
|
@ -746,8 +752,8 @@ dev_t getDeviceId(std::string path) {
|
|||
#endif
|
||||
|
||||
#ifdef __APPLE__
|
||||
void getNetworkTraffic(uint32_t ip, uint64_t& bytesSent, uint64_t& bytesReceived,
|
||||
uint64_t& outSegs, uint64_t& retransSegs) {
|
||||
void getNetworkTraffic(const IPAddress& ip, uint64_t& bytesSent, uint64_t& bytesReceived, uint64_t& outSegs,
|
||||
uint64_t& retransSegs) {
|
||||
INJECT_FAULT( platform_error, "getNetworkTraffic" );
|
||||
|
||||
const char* ifa_name = nullptr;
|
||||
|
@ -1095,7 +1101,7 @@ void initPdhStrings(SystemStatisticsState *state, std::string dataFolder) {
|
|||
}
|
||||
#endif
|
||||
|
||||
SystemStatistics getSystemStatistics(std::string dataFolder, uint32_t ip, SystemStatisticsState **statState) {
|
||||
SystemStatistics getSystemStatistics(std::string dataFolder, const IPAddress* ip, SystemStatisticsState** statState) {
|
||||
if( (*statState) == NULL )
|
||||
(*statState) = new SystemStatisticsState();
|
||||
SystemStatistics returnStats;
|
||||
|
@ -1189,7 +1195,7 @@ SystemStatistics getSystemStatistics(std::string dataFolder, uint32_t ip, System
|
|||
uint64_t machineOutSegs = (*statState)->machineLastOutSegs;
|
||||
uint64_t machineRetransSegs = (*statState)->machineLastRetransSegs;
|
||||
|
||||
getNetworkTraffic(ip, machineNowSent, machineNowReceived, machineOutSegs, machineRetransSegs);
|
||||
getNetworkTraffic(*ip, machineNowSent, machineNowReceived, machineOutSegs, machineRetransSegs);
|
||||
if( returnStats.initialized ) {
|
||||
returnStats.machineMegabitsSent = ((machineNowSent - (*statState)->machineLastSent) * 8e-6);
|
||||
returnStats.machineMegabitsReceived = ((machineNowReceived - (*statState)->machineLastReceived) * 8e-6);
|
||||
|
|
|
@ -245,7 +245,9 @@ struct SystemStatistics {
|
|||
|
||||
struct SystemStatisticsState;
|
||||
|
||||
SystemStatistics getSystemStatistics(std::string dataFolder, uint32_t ip, SystemStatisticsState **statState);
|
||||
class IPAddress;
|
||||
|
||||
SystemStatistics getSystemStatistics(std::string dataFolder, const IPAddress* ip, SystemStatisticsState **statState);
|
||||
|
||||
double getProcessorTimeThread();
|
||||
|
||||
|
|
|
@ -43,19 +43,18 @@ void systemMonitor() {
|
|||
|
||||
SystemStatistics getSystemStatistics() {
|
||||
static StatisticsState statState = StatisticsState();
|
||||
const IPAddress ipAddr = machineState.ip.present() ? machineState.ip.get() : IPAddress();
|
||||
return getSystemStatistics(
|
||||
machineState.folder.present() ? machineState.folder.get() : "",
|
||||
machineState.ip.present() ? machineState.ip.get() : 0,
|
||||
&statState.systemState);
|
||||
machineState.folder.present() ? machineState.folder.get() : "", &ipAddr, &statState.systemState);
|
||||
}
|
||||
|
||||
#define TRACEALLOCATOR( size ) TraceEvent("MemSample").detail("Count", FastAllocator<size>::getApproximateMemoryUnused()/size).detail("TotalSize", FastAllocator<size>::getApproximateMemoryUnused()).detail("SampleCount", 1).detail("Hash", "FastAllocatedUnused" #size ).detail("Bt", "na")
|
||||
#define DETAILALLOCATORMEMUSAGE( size ) detail("TotalMemory"#size, FastAllocator<size>::getTotalMemory()).detail("ApproximateUnusedMemory"#size, FastAllocator<size>::getApproximateMemoryUnused()).detail("ActiveThreads"#size, FastAllocator<size>::getActiveThreads())
|
||||
|
||||
SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *statState, bool machineMetrics) {
|
||||
SystemStatistics currentStats = getSystemStatistics(machineState.folder.present() ? machineState.folder.get() : "",
|
||||
machineState.ip.present() ? machineState.ip.get() : 0,
|
||||
&statState->systemState);
|
||||
const IPAddress ipAddr = machineState.ip.present() ? machineState.ip.get() : IPAddress();
|
||||
SystemStatistics currentStats = getSystemStatistics(machineState.folder.present() ? machineState.folder.get() : "",
|
||||
&ipAddr, &statState->systemState);
|
||||
NetworkData netData;
|
||||
netData.init();
|
||||
if (!DEBUG_DETERMINISM && currentStats.initialized) {
|
||||
|
|
|
@ -29,14 +29,15 @@ struct SystemMonitorMachineState {
|
|||
Optional<std::string> folder;
|
||||
Optional<Standalone<StringRef>> zoneId;
|
||||
Optional<Standalone<StringRef>> machineId;
|
||||
Optional<uint32_t> ip;
|
||||
Optional<IPAddress> ip;
|
||||
|
||||
double monitorStartTime;
|
||||
|
||||
SystemMonitorMachineState() : monitorStartTime(0) {}
|
||||
SystemMonitorMachineState(uint32_t ip) : ip(ip), monitorStartTime(0) {}
|
||||
SystemMonitorMachineState(std::string folder, Optional<Standalone<StringRef>> zoneId, Optional<Standalone<StringRef>> machineId, uint32_t ip)
|
||||
: folder(folder), zoneId(zoneId), machineId(machineId), ip(ip), monitorStartTime(0) {}
|
||||
explicit SystemMonitorMachineState(const IPAddress& ip) : ip(ip), monitorStartTime(0) {}
|
||||
SystemMonitorMachineState(std::string folder, Optional<Standalone<StringRef>> zoneId,
|
||||
Optional<Standalone<StringRef>> machineId, const IPAddress& ip)
|
||||
: folder(folder), zoneId(zoneId), machineId(machineId), ip(ip), monitorStartTime(0) {}
|
||||
};
|
||||
|
||||
void initializeSystemMonitorMachineState(SystemMonitorMachineState machineState);
|
||||
|
|
|
@ -183,8 +183,7 @@ public:
|
|||
// Get and store the local address in the metric collection, but only if it is not 0.0.0.0:0
|
||||
if( address.size() == 0 ) {
|
||||
NetworkAddress addr = g_network->getLocalAddress();
|
||||
if(addr.ip != 0 && addr.port != 0)
|
||||
address = StringRef(addr.toString());
|
||||
if (addr.ip.isValid() && addr.port != 0) address = StringRef(addr.toString());
|
||||
}
|
||||
return address.size() != 0;
|
||||
}
|
||||
|
|
|
@ -333,7 +333,8 @@ public:
|
|||
|
||||
void annotateEvent( TraceEventFields &fields ) {
|
||||
if(localAddress.present()) {
|
||||
fields.addField("Machine", format("%d.%d.%d.%d:%d", (localAddress.get().ip>>24)&0xff, (localAddress.get().ip>>16)&0xff, (localAddress.get().ip>>8)&0xff, localAddress.get().ip&0xff, localAddress.get().port));
|
||||
fields.addField("Machine",
|
||||
format("%s:%d", localAddress.get().ip.toString().c_str(), localAddress.get().port));
|
||||
}
|
||||
|
||||
fields.addField("LogGroup", logGroup);
|
||||
|
@ -624,7 +625,7 @@ void openTraceFile(const NetworkAddress& na, uint64_t rollsize, uint64_t maxLogs
|
|||
if (baseOfBase.empty())
|
||||
baseOfBase = "trace";
|
||||
|
||||
std::string baseName = format("%s.%03d.%03d.%03d.%03d.%d", baseOfBase.c_str(), (na.ip>>24)&0xff, (na.ip>>16)&0xff, (na.ip>>8)&0xff, na.ip&0xff, na.port);
|
||||
std::string baseName = format("%s.%s.%d", baseOfBase.c_str(), na.ip.toString().c_str(), na.port);
|
||||
g_traceLog.open( directory, baseName, logGroup, format("%lld", time(NULL)), rollsize, maxLogsSize, !g_network->isSimulated() ? na : Optional<NetworkAddress>());
|
||||
|
||||
uncancellable(recurring(&flushTraceFile, FLOW_KNOBS->TRACE_FLUSH_INTERVAL, TaskFlushTrace));
|
||||
|
@ -716,7 +717,7 @@ bool TraceEvent::init() {
|
|||
detail("Type", type);
|
||||
if(g_network && g_network->isSimulated()) {
|
||||
NetworkAddress local = g_network->getLocalAddress();
|
||||
detailf("Machine", "%d.%d.%d.%d:%d", (local.ip>>24)&0xff, (local.ip>>16)&0xff, (local.ip>>8)&0xff, local.ip&0xff, local.port);
|
||||
detailf("Machine", "%s:%d", local.ip.toString().c_str(), local.port);
|
||||
}
|
||||
detail("ID", id);
|
||||
if(err.isValid()) {
|
||||
|
@ -1015,7 +1016,7 @@ void TraceBatch::dump() {
|
|||
std::string machine;
|
||||
if(g_network->isSimulated()) {
|
||||
NetworkAddress local = g_network->getLocalAddress();
|
||||
machine = format("%d.%d.%d.%d:%d", (local.ip>>24)&0xff,(local.ip>>16)&0xff,(local.ip>>8)&0xff,local.ip&0xff,local.port);
|
||||
machine = format("%s:%d", local.ip.toString().c_str(), local.port);
|
||||
}
|
||||
|
||||
for(int i = 0; i < attachBatch.size(); i++) {
|
||||
|
|
110
flow/network.cpp
110
flow/network.cpp
|
@ -18,8 +18,54 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "boost/asio.hpp"
|
||||
|
||||
#include "flow/network.h"
|
||||
#include "flow/flow.h"
|
||||
#include "flow/UnitTest.h"
|
||||
|
||||
IPAddress::IPAddress() : store({}), isV6addr(false) {}
|
||||
|
||||
IPAddress::IPAddress(const IPAddressStore& v6addr) : store(v6addr), isV6addr(true) {}
|
||||
|
||||
IPAddress::IPAddress(uint32_t v4addr) : store({}), isV6addr(false) {
|
||||
uint32_t* parts = (uint32_t*)store.data();
|
||||
parts[0] = v4addr;
|
||||
}
|
||||
|
||||
uint32_t IPAddress::toV4() const {
|
||||
const uint32_t* parts = (uint32_t*)store.data();
|
||||
return parts[0];
|
||||
}
|
||||
|
||||
bool IPAddress::operator==(const IPAddress& addr) const {
|
||||
return isV6addr == addr.isV6addr && store == addr.store;
|
||||
}
|
||||
|
||||
bool IPAddress::operator!=(const IPAddress& addr) const {
|
||||
return !(*this == addr);
|
||||
}
|
||||
|
||||
bool IPAddress::operator<(const IPAddress& addr) const {
|
||||
return isV6() == addr.isV6() ? store < addr.store : isV6() < addr.isV6();
|
||||
}
|
||||
|
||||
std::string IPAddress::toString() const {
|
||||
if (isV6()) {
|
||||
return boost::asio::ip::address_v6(store).to_string();
|
||||
} else {
|
||||
const uint32_t ip = toV4();
|
||||
return format("%d.%d.%d.%d", (ip >> 24) & 0xff, (ip >> 16) & 0xff, (ip >> 8) & 0xff, ip & 0xff);
|
||||
}
|
||||
}
|
||||
|
||||
bool IPAddress::isValid() const {
|
||||
if (!isV6()) {
|
||||
return toV4() != 0;
|
||||
}
|
||||
|
||||
return std::any_of(store.begin(), store.end(), [](uint8_t part) { return part > 0; });
|
||||
}
|
||||
|
||||
NetworkAddress NetworkAddress::parse( std::string const& s ) {
|
||||
bool isTLS = false;
|
||||
|
@ -27,12 +73,31 @@ NetworkAddress NetworkAddress::parse( std::string const& s ) {
|
|||
if( s.size() > 4 && strcmp(s.c_str() + s.size() - 4, ":tls") == 0 ) {
|
||||
isTLS = true;
|
||||
f = s.substr(0, s.size() - 4);
|
||||
} else
|
||||
} else {
|
||||
f = s;
|
||||
int a,b,c,d,port,count=-1;
|
||||
if (sscanf(f.c_str(), "%d.%d.%d.%d:%d%n", &a,&b,&c,&d, &port, &count)<5 || count != f.size())
|
||||
throw connection_string_invalid();
|
||||
return NetworkAddress( (a<<24)+(b<<16)+(c<<8)+d, port, true, isTLS );
|
||||
}
|
||||
|
||||
if (f[0] == '[') {
|
||||
// IPv6 address/port pair is represented as "[ip]:port"
|
||||
auto addrEnd = f.find_first_of(']');
|
||||
if (addrEnd == std::string::npos || f[addrEnd + 1] != ':') {
|
||||
throw connection_string_invalid();
|
||||
}
|
||||
|
||||
try {
|
||||
auto port = std::stoi(f.substr(addrEnd + 2));
|
||||
auto addr = boost::asio::ip::address::from_string(f.substr(1, addrEnd - 1));
|
||||
ASSERT(addr.is_v6());
|
||||
return NetworkAddress(IPAddress(addr.to_v6().to_bytes()), port, true, isTLS);
|
||||
} catch (...) {
|
||||
throw connection_string_invalid();
|
||||
}
|
||||
} else {
|
||||
int a, b, c, d, port, count = -1;
|
||||
if (sscanf(f.c_str(), "%d.%d.%d.%d:%d%n", &a, &b, &c, &d, &port, &count) < 5 || count != f.size())
|
||||
throw connection_string_invalid();
|
||||
return NetworkAddress((a << 24) + (b << 16) + (c << 8) + d, port, true, isTLS);
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<NetworkAddress> NetworkAddress::parseList( std::string const& addrs ) {
|
||||
|
@ -49,11 +114,13 @@ std::vector<NetworkAddress> NetworkAddress::parseList( std::string const& addrs
|
|||
}
|
||||
|
||||
std::string NetworkAddress::toString() const {
|
||||
return format( "%d.%d.%d.%d:%d%s", (ip>>24)&0xff, (ip>>16)&0xff, (ip>>8)&0xff, ip&0xff, port, isTLS() ? ":tls" : "" );
|
||||
}
|
||||
|
||||
std::string toIPString(uint32_t ip) {
|
||||
return format( "%d.%d.%d.%d", (ip>>24)&0xff, (ip>>16)&0xff, (ip>>8)&0xff, ip&0xff );
|
||||
const char* patt;
|
||||
if (isV6()) {
|
||||
patt = "[%s]:%d%s";
|
||||
} else {
|
||||
patt = "%s:%d%s";
|
||||
}
|
||||
return format(patt, ip.toString().c_str(), port, isTLS() ? ":tls" : "");
|
||||
}
|
||||
|
||||
std::string toIPVectorString(std::vector<uint32_t> ips) {
|
||||
|
@ -82,3 +149,26 @@ Future<Reference<IConnection>> INetworkConnections::connect( std::string host, s
|
|||
return connect(addr, host);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_CASE("/flow/network/ipaddress") {
|
||||
ASSERT(NetworkAddress::parse("[::1]:4800").toString() == "[::1]:4800");
|
||||
|
||||
{
|
||||
auto addr = "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:4800";
|
||||
auto addrParsed = NetworkAddress::parse(addr);
|
||||
auto addrCompressed = "[2001:db8:85a3::8a2e:370:7334]:4800";
|
||||
ASSERT(addrParsed.isV6());
|
||||
ASSERT(!addrParsed.isTLS());
|
||||
ASSERT(addrParsed.toString() == addrCompressed);
|
||||
}
|
||||
|
||||
{
|
||||
auto addr = "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:4800:tls";
|
||||
auto addrParsed = NetworkAddress::parse(addr);
|
||||
auto addrCompressed = "[2001:db8:85a3::8a2e:370:7334]:4800:tls";
|
||||
ASSERT(addrParsed.isV6());
|
||||
ASSERT(addrParsed.isTLS());
|
||||
ASSERT(addrParsed.toString() == addrCompressed);
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#define FLOW_OPENNETWORK_H
|
||||
#pragma once
|
||||
|
||||
#include <array>
|
||||
#include <string>
|
||||
#include <stdint.h>
|
||||
#include "flow/serialize.h"
|
||||
|
@ -75,25 +76,78 @@ enum {
|
|||
|
||||
class Void;
|
||||
|
||||
struct IPAddress {
|
||||
// Represents both IPv4 and IPv6 address. For IPv4 addresses,
|
||||
// only the first 32bits are relevant and rest are initialized to
|
||||
// 0.
|
||||
typedef std::array<uint8_t, 16> IPAddressStore;
|
||||
|
||||
IPAddress();
|
||||
explicit IPAddress(const IPAddressStore& v6addr);
|
||||
explicit IPAddress(uint32_t v4addr);
|
||||
|
||||
bool isV6() const { return isV6addr; }
|
||||
bool isV4() const { return !isV6addr; }
|
||||
bool isValid() const;
|
||||
|
||||
// Returns raw v4/v6 representation of address. Caller is responsible
|
||||
// to call these functions safely.
|
||||
uint32_t toV4() const;
|
||||
const IPAddressStore& toV6() const { return store; }
|
||||
|
||||
std::string toString() const;
|
||||
|
||||
bool operator==(const IPAddress& addr) const;
|
||||
bool operator!=(const IPAddress& addr) const;
|
||||
bool operator<(const IPAddress& addr) const;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, isV6addr);
|
||||
if (isV6addr) {
|
||||
serializer(ar, store);
|
||||
} else {
|
||||
uint32_t* parts = (uint32_t*)store.data();
|
||||
serializer(ar, parts[0]);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
bool isV6addr;
|
||||
IPAddressStore store;
|
||||
};
|
||||
|
||||
struct NetworkAddress {
|
||||
// A NetworkAddress identifies a particular running server (i.e. a TCP endpoint).
|
||||
uint32_t ip;
|
||||
IPAddress ip;
|
||||
uint16_t port;
|
||||
uint16_t flags;
|
||||
|
||||
enum { FLAG_PRIVATE = 1, FLAG_TLS = 2 };
|
||||
|
||||
NetworkAddress() : ip(0), port(0), flags(FLAG_PRIVATE) {}
|
||||
NetworkAddress( uint32_t ip, uint16_t port ) : ip(ip), port(port), flags(FLAG_PRIVATE) {}
|
||||
NetworkAddress( uint32_t ip, uint16_t port, bool isPublic, bool isTLS ) : ip(ip), port(port),
|
||||
flags( (isPublic ? 0 : FLAG_PRIVATE) | (isTLS ? FLAG_TLS : 0 ) ) {}
|
||||
NetworkAddress() : ip(IPAddress(0)), port(0), flags(FLAG_PRIVATE) {}
|
||||
NetworkAddress(const IPAddress& address, uint16_t port, bool isPublic, bool isTLS)
|
||||
: ip(address), port(port), flags((isPublic ? 0 : FLAG_PRIVATE) | (isTLS ? FLAG_TLS : 0)) {}
|
||||
NetworkAddress(uint32_t ip, uint16_t port, bool isPublic, bool isTLS)
|
||||
: NetworkAddress(IPAddress(ip), port, isPublic, isTLS) {}
|
||||
|
||||
bool operator == (NetworkAddress const& r) const { return ip==r.ip && port==r.port && flags==r.flags; }
|
||||
bool operator != (NetworkAddress const& r) const { return ip!=r.ip || port!=r.port || flags!=r.flags; }
|
||||
bool operator< (NetworkAddress const& r) const { if (flags != r.flags) return flags < r.flags; if (ip != r.ip) return ip < r.ip; return port<r.port; }
|
||||
bool isValid() const { return ip != 0 || port != 0; }
|
||||
NetworkAddress(uint32_t ip, uint16_t port) : NetworkAddress(ip, port, false, false) {}
|
||||
NetworkAddress(const IPAddress& ip, uint16_t port) : NetworkAddress(ip, port, false, false) {}
|
||||
|
||||
bool operator==(NetworkAddress const& r) const { return ip == r.ip && port == r.port && flags == r.flags; }
|
||||
bool operator!=(NetworkAddress const& r) const { return ip != r.ip || port != r.port || flags != r.flags; }
|
||||
bool operator<(NetworkAddress const& r) const {
|
||||
if (flags != r.flags)
|
||||
return flags < r.flags;
|
||||
else if (ip != r.ip)
|
||||
return ip < r.ip;
|
||||
return port < r.port;
|
||||
}
|
||||
|
||||
bool isValid() const { return ip.isValid() || port != 0; }
|
||||
bool isPublic() const { return !(flags & FLAG_PRIVATE); }
|
||||
bool isTLS() const { return (flags & FLAG_TLS) != 0; }
|
||||
bool isV6() const { return ip.isV6(); }
|
||||
|
||||
static NetworkAddress parse( std::string const& );
|
||||
static std::vector<NetworkAddress> parseList( std::string const& );
|
||||
|
@ -101,13 +155,18 @@ struct NetworkAddress {
|
|||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
ar.serializeBinaryItem(*this);
|
||||
if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) {
|
||||
uint32_t ipV4;
|
||||
serializer(ar, ipV4, port, flags);
|
||||
ip = IPAddress(ipV4);
|
||||
} else {
|
||||
serializer(ar, ip, port, flags);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
typedef std::vector<NetworkAddress> NetworkAddressList;
|
||||
|
||||
std::string toIPString(uint32_t ip);
|
||||
std::string toIPVectorString(std::vector<uint32_t> ips);
|
||||
|
||||
template <class T> class Future;
|
||||
|
|
Loading…
Reference in New Issue