Listen to multiple addresses and start using vector<NetworkAdddress> in Endpoint

- This patch will make FDB listen to multiple addresses given via
  command line. Although, we'll still use first address in most places,
  this patch starts using vector<NetworkAddress> in Endpoint at some basic
  places.
- When sending packets to an endpoint, pick a random network address in
  endpoints
- Renames Endpoint::address to Endpoint::addresses since it
  now holds a vector of addresses.
This commit is contained in:
Vishesh Yadav 2018-10-30 13:44:37 -07:00
parent 43e5a46f9b
commit 3eb9b23024
33 changed files with 201 additions and 144 deletions

View File

@ -36,7 +36,7 @@ struct ClientWorkerInterface {
bool operator == (ClientWorkerInterface const& r) const { return id() == r.id(); }
bool operator != (ClientWorkerInterface const& r) const { return id() != r.id(); }
UID id() const { return reboot.getEndpoint().token; }
NetworkAddress address() const { return reboot.getEndpoint().address[0]; }
NetworkAddress address() const { return reboot.getEndpoint().getPrimaryAddress(); }
template <class Ar>
void serialize( Ar& ar ) {

View File

@ -39,7 +39,7 @@ struct ClusterInterface {
bool operator == (ClusterInterface const& r) const { return id() == r.id(); }
bool operator != (ClusterInterface const& r) const { return id() != r.id(); }
UID id() const { return openDatabase.getEndpoint().token; }
NetworkAddress address() const { return openDatabase.getEndpoint().address[0]; }
NetworkAddress address() const { return openDatabase.getEndpoint().getPrimaryAddress(); }
void initEndpoints() {
openDatabase.getEndpoint( TaskClusterController );

View File

@ -44,10 +44,10 @@ ACTOR Future<Void> failureMonitorClientLoop(
state double before = now();
state double waitfor = 0;
monitor->setStatus(controller.failureMonitoring.getEndpoint().address[0], FailureStatus(false));
fmState->knownAddrs.insert( controller.failureMonitoring.getEndpoint().address[0] );
monitor->setStatus(controller.failureMonitoring.getEndpoint().getPrimaryAddress(), FailureStatus(false));
fmState->knownAddrs.insert( controller.failureMonitoring.getEndpoint().getPrimaryAddress() );
//The cluster controller's address (controller.failureMonitoring.getEndpoint().address[0]) is treated specially because we can declare that it is down independently
//The cluster controller's address (controller.failureMonitoring.getEndpoint().getPrimaryAddress()) is treated specially because we can declare that it is down independently
//of the response from the cluster controller. It still needs to be in knownAddrs in case the cluster controller changes, so the next cluster controller resets its state
try {
@ -59,7 +59,7 @@ ACTOR Future<Void> failureMonitorClientLoop(
requestTimeout = Never();
if (reply.allOthersFailed) {
// Reset all systems *not* mentioned in the reply to the default (failed) state
fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().address[0] );
fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().getPrimaryAddress() );
std::set<NetworkAddress> changedAddresses;
for(int c=0; c<reply.changes.size(); c++)
changedAddresses.insert( reply.changes[c].address );
@ -73,8 +73,8 @@ ACTOR Future<Void> failureMonitorClientLoop(
if( monitor->getState( controller.failureMonitoring.getEndpoint() ).isFailed() )
TraceEvent("FailureMonitoringServerUp").detail("OldServer",controller.id());
monitor->setStatus( controller.failureMonitoring.getEndpoint().address[0], FailureStatus(false) );
fmState->knownAddrs.insert( controller.failureMonitoring.getEndpoint().address[0] );
monitor->setStatus( controller.failureMonitoring.getEndpoint().getPrimaryAddress(), FailureStatus(false) );
fmState->knownAddrs.insert( controller.failureMonitoring.getEndpoint().getPrimaryAddress() );
//if (version != reply.failureInformationVersion)
// printf("Client '%s': update from %lld to %lld (%d changes, aof=%d)\n", g_network->getLocalAddress().toString().c_str(), version, reply.failureInformationVersion, reply.changes.size(), reply.allOthersFailed);
@ -88,7 +88,7 @@ ACTOR Future<Void> failureMonitorClientLoop(
fmState->knownAddrs.insert( reply.changes[c].address );
else
fmState->knownAddrs.erase( reply.changes[c].address );
ASSERT( reply.changes[c].address != controller.failureMonitoring.getEndpoint().address[0] || !reply.changes[c].status.failed );
ASSERT( reply.changes[c].address != controller.failureMonitoring.getEndpoint().getPrimaryAddress() || !reply.changes[c].status.failed );
}
before = now();
waitfor = reply.clientRequestIntervalMS * .001;
@ -98,8 +98,8 @@ ACTOR Future<Void> failureMonitorClientLoop(
g_network->setCurrentTask(TaskDefaultDelay);
requestTimeout = Never();
TraceEvent(SevWarn, "FailureMonitoringServerDown").detail("OldServerID",controller.id());
monitor->setStatus( controller.failureMonitoring.getEndpoint().address[0], FailureStatus(true) );
fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().address[0] );
monitor->setStatus( controller.failureMonitoring.getEndpoint().getPrimaryAddress(), FailureStatus(true) );
fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().getPrimaryAddress() );
}
when( wait( nextRequest ) ) {
g_network->setCurrentTask(TaskDefaultDelay);

View File

@ -46,7 +46,7 @@ struct MasterProxyInterface {
std::string toString() const { return id().shortString(); }
bool operator == (MasterProxyInterface const& r) const { return id() == r.id(); }
bool operator != (MasterProxyInterface const& r) const { return id() != r.id(); }
NetworkAddress address() const { return commit.getEndpoint().address[0]; }
NetworkAddress address() const { return commit.getEndpoint().getPrimaryAddress(); }
template <class Archive>
void serialize(Archive& ar) {

View File

@ -312,7 +312,7 @@ ACTOR Future<Void> monitorNominee( Key key, ClientLeaderRegInterface coord, Asyn
state Optional<LeaderInfo> li = wait( retryBrokenPromise( coord.getLeader, GetLeaderRequest( key, info->present() ? info->get().changeID : UID() ), TaskCoordinationReply ) );
wait( Future<Void>(Void()) ); // Make sure we weren't cancelled
TraceEvent("GetLeaderReply").suppressFor(1.0).detail("Coordinator", coord.getLeader.getEndpoint().address[0]).detail("Nominee", li.present() ? li.get().changeID : UID()).detail("Generation", generation);
TraceEvent("GetLeaderReply").suppressFor(1.0).detail("Coordinator", coord.getLeader.getEndpoint().getPrimaryAddress()).detail("Nominee", li.present() ? li.get().changeID : UID()).detail("Generation", generation);
if (li != *info) {
*info = li;

View File

@ -300,7 +300,7 @@ ACTOR Future<Optional<StatusObject>> clientCoordinatorsStatusFetcher(Reference<C
int coordinatorsUnavailable = 0;
for (int i = 0; i < leaderServers.size(); i++) {
StatusObject coordStatus;
coordStatus["address"] = coord.clientLeaderServers[i].getLeader.getEndpoint().address[0].toString();
coordStatus["address"] = coord.clientLeaderServers[i].getLeader.getEndpoint().getPrimaryAddress().toString();
if (leaderServers[i].isReady()){
coordStatus["reachable"] = true;

View File

@ -61,7 +61,7 @@ struct StorageServerInterface {
explicit StorageServerInterface(UID uid) : uniqueID( uid ) {}
StorageServerInterface() : uniqueID( g_random->randomUniqueID() ) {}
NetworkAddress address() const { return getVersion.getEndpoint().address[0]; }
NetworkAddress address() const { return getVersion.getEndpoint().getPrimaryAddress(); }
UID id() const { return uniqueID; }
std::string toString() const { return id().shortString(); }
template <class Ar>

View File

@ -92,7 +92,7 @@ void SimpleFailureMonitor::setStatus( NetworkAddress const& address, FailureStat
void SimpleFailureMonitor::endpointNotFound( Endpoint const& endpoint ) {
// SOMEDAY: Expiration (this "leaks" memory)
TraceEvent("EndpointNotFound").suppressFor(1.0).detail("Address", endpoint.address[0]).detail("Token", endpoint.token);
TraceEvent("EndpointNotFound").suppressFor(1.0).detail("Address", endpoint.getPrimaryAddress()).detail("Token", endpoint.token);
endpointKnownFailed.set( endpoint, true );
}
@ -103,9 +103,9 @@ void SimpleFailureMonitor::notifyDisconnect( NetworkAddress const& address ) {
Future<Void> SimpleFailureMonitor::onDisconnectOrFailure( Endpoint const& endpoint ) {
// If the endpoint or address is already failed, return right away
auto i = addressStatus.find(endpoint.address[0]);
auto i = addressStatus.find(endpoint.getPrimaryAddress());
if (i == addressStatus.end() || i->value.isFailed() || endpointKnownFailed.get(endpoint)) {
TraceEvent("AlreadyDisconnected").detail("Addr", endpoint.address[0]).detail("Tok", endpoint.token);
TraceEvent("AlreadyDisconnected").detail("Addr", endpoint.getPrimaryAddress()).detail("Tok", endpoint.token);
return Void();
}
@ -131,17 +131,17 @@ FailureStatus SimpleFailureMonitor::getState( Endpoint const& endpoint ) {
if (endpointKnownFailed.get(endpoint))
return FailureStatus(true);
else {
auto a = addressStatus.find(endpoint.address[0]);
auto a = addressStatus.find(endpoint.getPrimaryAddress());
if (a == addressStatus.end()) return FailureStatus();
else return a->value;
//printf("%s.getState(%s) = %s %p\n", g_network->getLocalAddress().toString(), endpoint.address[0].toString(), a.failed ? "FAILED" : "OK", this);
//printf("%s.getState(%s) = %s %p\n", g_network->getLocalAddress().toString(), endpoint.getPrimaryAddress().toString(), a.failed ? "FAILED" : "OK", this);
}
}
bool SimpleFailureMonitor::onlyEndpointFailed( Endpoint const& endpoint ) {
if(!endpointKnownFailed.get(endpoint))
return false;
auto a = addressStatus.find(endpoint.address[0]);
auto a = addressStatus.find(endpoint.getPrimaryAddress());
if (a == addressStatus.end()) return true;
else return !a->value.failed;
}

View File

@ -89,7 +89,7 @@ public:
// The next time the known status for the endpoint changes, returns the new status.
virtual Future<Void> onStateChanged( Endpoint const& endpoint ) = 0;
// Returns when onFailed(endpoint) || transport().onDisconnect( endpoint.address[0] ), but more efficiently
// Returns when onFailed(endpoint) || transport().onDisconnect( endpoint.getPrimaryAddress() ), but more efficiently
virtual Future<Void> onDisconnectOrFailure( Endpoint const& endpoint ) = 0;
// Returns true if the endpoint is failed but the address of the endpoint is not failed.

View File

@ -309,7 +309,7 @@ TEST_CASE("/flow/flow/networked futures")
BinaryWriter wr(IncludeVersion());
wr << locInt;
ASSERT(locInt.getEndpoint().isValid() && locInt.getEndpoint().isLocal() && locInt.getEndpoint().address[0] == FlowTransport::transport().getLocalAddress());
ASSERT(locInt.getEndpoint().isValid() && locInt.getEndpoint().isLocal() && locInt.getEndpoint().getPrimaryAddress() == FlowTransport::transport().getLocalAddress());
BinaryReader rd(wr.toStringRef(), IncludeVersion());
RequestStream<int> remoteInt;

View File

@ -163,9 +163,19 @@ public:
struct Peer* getPeer( NetworkAddress const& address, bool openConnection = true );
NetworkAddress localAddress;
// TODO (Vishesh) : Now make it make sense!
const NetworkAddress& getFirstLocalAddress() const {
return localListeners.begin()->first;
}
// Returns a vector of NetworkAddresses we are listening to.
NetworkAddressList getLocalAddresses() const;
// Returns true if given network address 'address' is one of the address we are listening on.
bool isLocalAddress(const NetworkAddress& address) const;
std::map<NetworkAddress, Future<Void>> localListeners;
std::map<NetworkAddress, struct Peer*> peers;
Future<Void> listen;
bool warnAlwaysForLargePacket;
// These declarations must be in exactly this order
@ -248,13 +258,14 @@ struct Peer : NonCopyable {
void prependConnectPacket() {
// Send the ConnectPacket expected at the beginning of a new connection
ConnectPacket pkt;
if (transport->localAddress.isTLS() != destination.isTLS()) {
const NetworkAddress& localAddress = transport->getFirstLocalAddress();
if (localAddress.isTLS() != destination.isTLS()) {
pkt.canonicalRemotePort = 0; // a "mixed" TLS/non-TLS connection is like a client/server connection - there's no way to reverse it
pkt.canonicalRemoteIp = 0;
}
else {
pkt.canonicalRemotePort = transport->localAddress.port;
pkt.canonicalRemoteIp = transport->localAddress.ip;
pkt.canonicalRemotePort = localAddress.port;
pkt.canonicalRemoteIp = localAddress.ip;
}
pkt.connectPacketLength = sizeof(pkt)-sizeof(pkt.connectPacketLength);
pkt.protocolVersion = currentProtocolVersion;
@ -282,7 +293,7 @@ struct Peer : NonCopyable {
// In case two processes are trying to connect to each other simultaneously, the process with the larger canonical NetworkAddress
// gets to keep its outgoing connection.
if ( !destination.isPublic() && !outgoingConnectionIdle ) throw address_in_use();
if ( !destination.isPublic() || outgoingConnectionIdle || destination > transport->localAddress ) {
if ( !destination.isPublic() || outgoingConnectionIdle || destination > transport->getFirstLocalAddress() ) { //TODO (Vishesh): Last condition.
// Keep the new connection
TraceEvent("IncomingConnection", conn->getDebugID())
.suppressFor(1.0)
@ -463,20 +474,20 @@ ACTOR static void deliver( TransportData* self, Endpoint destination, ArenaReade
auto receiver = self->endpoints.get(destination.token);
if (receiver) {
try {
g_currentDeliveryPeerAddress = destination.address[0];
g_currentDeliveryPeerAddress = destination.getPrimaryAddress();
receiver->receive( reader );
g_currentDeliveryPeerAddress = NetworkAddress();
} catch (Error& e) {
g_currentDeliveryPeerAddress = NetworkAddress();
TraceEvent(SevError, "ReceiverError").error(e).detail("Token", destination.token.toString()).detail("Peer", destination.address[0]);
TraceEvent(SevError, "ReceiverError").error(e).detail("Token", destination.token.toString()).detail("Peer", destination.getPrimaryAddress());
throw;
}
} else if (destination.token.first() & TOKEN_STREAM_FLAG) {
// We don't have the (stream) endpoint 'token', notify the remote machine
if (destination.token.first() != -1)
sendPacket( self,
SerializeSource<Endpoint>( Endpoint( {self->localAddress}, destination.token ) ),
Endpoint( {destination.address[0]}, WLTOKEN_ENDPOINT_NOT_FOUND),
SerializeSource<Endpoint>( Endpoint( self->getLocalAddresses(), destination.token ) ),
Endpoint( destination.addresses, WLTOKEN_ENDPOINT_NOT_FOUND),
false, true );
}
@ -491,7 +502,7 @@ static void scanPackets( TransportData* transport, uint8_t*& unprocessed_begin,
uint8_t* p = unprocessed_begin;
bool checksumEnabled = true;
if (transport->localAddress.isTLS() || peerAddress.isTLS()) {
if (transport->getFirstLocalAddress().isTLS() || peerAddress.isTLS()) {
checksumEnabled = false;
}
@ -688,7 +699,7 @@ ACTOR static Future<Void> connectionReader(
peer->transport->numIncompatibleConnections++;
incompatiblePeerCounted = true;
}
ASSERT( p->canonicalRemotePort == peerAddress.port );
// ASSERT( p->canonicalRemotePort == peerAddress.port );
} else {
if (p->canonicalRemotePort) {
peerAddress = NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort, true, peerAddress.isTLS() );
@ -783,6 +794,18 @@ Peer* TransportData::getPeer( NetworkAddress const& address, bool openConnection
return newPeer;
}
NetworkAddressList TransportData::getLocalAddresses() const {
NetworkAddressList addresses;
for (const auto& addr_listener : localListeners) {
addresses.push_back(addr_listener.first);
}
return addresses;
}
bool TransportData::isLocalAddress(const NetworkAddress& address) const {
return localListeners.find(address) != localListeners.end();
}
ACTOR static Future<Void> multiVersionCleanupWorker( TransportData* self ) {
loop {
wait(delay(FLOW_KNOBS->CONNECTION_CLEANUP_DELAY));
@ -812,7 +835,14 @@ FlowTransport::~FlowTransport() { delete self; }
void FlowTransport::initMetrics() { self->initMetrics(); }
NetworkAddress FlowTransport::getLocalAddress() { return self->localAddress; }
NetworkAddress FlowTransport::getLocalAddress() const {
//TODO (Vishesh) : Consider all addresses.
const NetworkAddressList& addresses = self->getLocalAddresses();
if (addresses.empty()) {
return NetworkAddress();
}
return self->getLocalAddresses()[0];
}
std::map<NetworkAddress, std::pair<uint64_t, double>>* FlowTransport::getIncompatiblePeers() {
for(auto it = self->incompatiblePeers.begin(); it != self->incompatiblePeers.end();) {
@ -827,39 +857,48 @@ std::map<NetworkAddress, std::pair<uint64_t, double>>* FlowTransport::getIncompa
Future<Void> FlowTransport::bind( NetworkAddress publicAddress, NetworkAddress listenAddress ) {
ASSERT( publicAddress.isPublic() );
self->localAddress = publicAddress;
ASSERT( self->localListeners.find(publicAddress) == self->localListeners.end() );
TraceEvent("Binding").detail("PublicAddress", publicAddress).detail("ListenAddress", listenAddress);
self->listen = listen( self, listenAddress );
return self->listen;
Future<Void> listenF = listen( self, listenAddress );
self->localListeners[publicAddress] = listenF;
return listenF;
}
void FlowTransport::loadedEndpoint( Endpoint& endpoint ) {
if (endpoint.address.size() > 0 && endpoint.address[0].isValid()) return;
if (endpoint.getPrimaryAddress().isValid()) return;
ASSERT( !(endpoint.token.first() & TOKEN_STREAM_FLAG) ); // Only reply promises are supposed to be unaddressed
ASSERT( g_currentDeliveryPeerAddress.isValid() );
endpoint.address = {g_currentDeliveryPeerAddress};
endpoint.addresses = {g_currentDeliveryPeerAddress};
}
void FlowTransport::addPeerReference( const Endpoint& endpoint, NetworkMessageReceiver* receiver ) {
if (!receiver->isStream() || !endpoint.address[0].isValid()) return;
Peer* peer = self->getPeer(endpoint.address[0]);
if(peer->peerReferences == -1) {
peer->peerReferences = 1;
} else {
peer->peerReferences++;
if (!receiver->isStream() || !endpoint.getPrimaryAddress().isValid()) return;
for (const NetworkAddress& address : endpoint.addresses) {
Peer* peer = self->getPeer(address);
if(peer->peerReferences == -1) {
peer->peerReferences = 1;
} else {
peer->peerReferences++;
}
}
}
void FlowTransport::removePeerReference( const Endpoint& endpoint, NetworkMessageReceiver* receiver ) {
if (!receiver->isStream() || !endpoint.address[0].isValid()) return;
Peer* peer = self->getPeer(endpoint.address[0], false);
if(peer) {
peer->peerReferences--;
if(peer->peerReferences < 0) {
TraceEvent(SevError, "InvalidPeerReferences").detail("References", peer->peerReferences).detail("Address", endpoint.address[0]).detail("Token", endpoint.token);
}
if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty()) {
peer->incompatibleDataRead.trigger();
if (!receiver->isStream() || !endpoint.getPrimaryAddress().isValid()) return;
for (const NetworkAddress& address : endpoint.addresses) {
Peer* peer = self->getPeer(address, false);
if(peer) {
peer->peerReferences--;
if(peer->peerReferences < 0) {
TraceEvent(SevError, "InvalidPeerReferences")
.detail("References", peer->peerReferences)
.detail("Address", address)
.detail("Token", endpoint.token);
}
if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty()) {
peer->incompatibleDataRead.trigger();
}
}
}
}
@ -867,10 +906,10 @@ void FlowTransport::removePeerReference( const Endpoint& endpoint, NetworkMessag
void FlowTransport::addEndpoint( Endpoint& endpoint, NetworkMessageReceiver* receiver, uint32_t taskID ) {
endpoint.token = g_random->randomUniqueID();
if (receiver->isStream()) {
endpoint.address = {getLocalAddress()};
endpoint.addresses = self->getLocalAddresses();
endpoint.token = UID( endpoint.token.first() | TOKEN_STREAM_FLAG, endpoint.token.second() );
} else {
endpoint.address = {NetworkAddress()};
endpoint.addresses = {NetworkAddress()};
endpoint.token = UID( endpoint.token.first() & ~TOKEN_STREAM_FLAG, endpoint.token.second() );
}
self->endpoints.insert( receiver, endpoint.token, taskID );
@ -881,7 +920,7 @@ void FlowTransport::removeEndpoint( const Endpoint& endpoint, NetworkMessageRece
}
void FlowTransport::addWellKnownEndpoint( Endpoint& endpoint, NetworkMessageReceiver* receiver, uint32_t taskID ) {
endpoint.address = {getLocalAddress()};
endpoint.addresses = self->getLocalAddresses();
ASSERT( ((endpoint.token.first() & TOKEN_STREAM_FLAG)!=0) == receiver->isStream() );
Endpoint::Token otoken = endpoint.token;
self->endpoints.insert( receiver, endpoint.token, taskID );
@ -889,7 +928,7 @@ void FlowTransport::addWellKnownEndpoint( Endpoint& endpoint, NetworkMessageRece
}
static PacketID sendPacket( TransportData* self, ISerializeSource const& what, const Endpoint& destination, bool reliable, bool openConnection ) {
if (destination.address[0] == self->localAddress) {
if (self->isLocalAddress(destination.getPrimaryAddress())) {
TEST(true); // "Loopback" delivery
// SOMEDAY: Would it be better to avoid (de)serialization by doing this check in flow?
@ -905,16 +944,16 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c
return (PacketID)NULL;
} else {
bool checksumEnabled = true;
if (self->localAddress.isTLS() || destination.address[0].isTLS()) {
if (self->getFirstLocalAddress().isTLS() || destination.getPrimaryAddress().isTLS()) {
checksumEnabled = false;
}
++self->countPacketsGenerated;
Peer* peer = self->getPeer(destination.address[0], openConnection);
Peer* peer = self->getPeer(destination.getRandomAddress(), openConnection);
// If there isn't an open connection, a public address, or the peer isn't compatible, we can't send
if (!peer || (peer->outgoingConnectionIdle && !destination.address[0].isPublic()) || (peer->incompatibleProtocolVersionNewer && destination.token != WLTOKEN_PING_PACKET)) {
if (!peer || (peer->outgoingConnectionIdle && !destination.getPrimaryAddress().isPublic()) || (peer->incompatibleProtocolVersionNewer && destination.token != WLTOKEN_PING_PACKET)) {
TEST(true); // Can't send to private address without a compatible open connection
return (PacketID)NULL;
}
@ -970,13 +1009,13 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c
}
if (len > FLOW_KNOBS->PACKET_LIMIT) {
TraceEvent(SevError, "Net2_PacketLimitExceeded").detail("ToPeer", destination.address[0]).detail("Length", (int)len);
TraceEvent(SevError, "Net2_PacketLimitExceeded").detail("ToPeer", destination.getPrimaryAddress()).detail("Length", (int)len);
// throw platform_error(); // FIXME: How to recover from this situation?
}
else if (len > FLOW_KNOBS->PACKET_WARNING) {
TraceEvent(self->warnAlwaysForLargePacket ? SevWarnAlways : SevWarn, "Net2_LargePacket")
.suppressFor(1.0)
.detail("ToPeer", destination.address[0])
.detail("ToPeer", destination.getPrimaryAddress())
.detail("Length", (int)len)
.detail("Token", destination.token)
.backtrace();

View File

@ -31,17 +31,29 @@ public:
// Endpoint represents a particular service (e.g. a serialized Promise<T> or PromiseStream<T>)
// An endpoint is either "local" (used for receiving data) or "remote" (used for sending data)
typedef UID Token;
NetworkAddressList address;
NetworkAddressList addresses;
Token token;
Endpoint() {}
Endpoint(const NetworkAddressList& addresses, Token token) : address(addresses), token(token) {}
Endpoint() : addresses({NetworkAddress()}) {}
Endpoint(const NetworkAddressList& addresses, Token token) : addresses(addresses), token(token) {
ASSERT(addresses.size() > 0);
}
bool isValid() const { return token.isValid(); }
bool isLocal() const;
// Return the primary network address, which is the first network address among
// all addresses this endpoint listens to.
const NetworkAddress& getPrimaryAddress() const {
return addresses[0];
}
const NetworkAddress& getRandomAddress() const {
return addresses[g_random->randomChoice(addresses)];
}
bool operator == (Endpoint const& r) const {
return address == r.address && token == r.token;
return addresses == r.addresses && token == r.token;
}
bool operator != (Endpoint const& r) const {
return !(*this == r);
@ -49,8 +61,8 @@ public:
//TODO: (Vishesh) Figure out what to do for vector of addresses this.
bool operator < (Endpoint const& r) const {
const NetworkAddress& left = address.empty() ? NetworkAddress() : address[0];
const NetworkAddress& right = r.address.empty() ? NetworkAddress() : r.address[0];
const NetworkAddress& left = addresses[0];
const NetworkAddress& right = r.addresses[0];
if (left != right)
return left < right;
else
@ -60,10 +72,9 @@ public:
template <class Ar>
void serialize(Ar& ar) {
if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061020002LL) {
address.emplace_back();
ar & address[0] & token;
ar & addresses[0] & token;
} else {
ar & address & token;
ar & addresses & token;
}
}
};
@ -97,8 +108,8 @@ public:
// Starts a server listening on the given listenAddress, and sets publicAddress to be the public
// address of this server. Returns only errors.
NetworkAddress getLocalAddress();
// Returns the NetworkAddress that would be assigned by addEndpoint (the public address)
NetworkAddress getLocalAddress() const;
// Returns first local NetworkAddress.
std::map<NetworkAddress, std::pair<uint64_t, double>>* getIncompatiblePeers();
// Returns the same of all peers that have attempted to connect, but have incompatible protocol versions
@ -151,7 +162,7 @@ private:
};
inline bool Endpoint::isLocal() const {
return address[0] == FlowTransport::transport().getLocalAddress();
return addresses[0] == FlowTransport::transport().getLocalAddress();
}
#endif

View File

@ -30,9 +30,6 @@
struct FlowReceiver : private NetworkMessageReceiver {
// Common endpoint code for NetSAV<> and NetNotifiedQueue<>
Endpoint endpoint;
bool m_isLocalEndpoint;
FlowReceiver() : m_isLocalEndpoint(false) {}
FlowReceiver(Endpoint const& remoteEndpoint) : endpoint(remoteEndpoint), m_isLocalEndpoint(false) {
FlowTransport::transport().addPeerReference(endpoint, this);
@ -64,6 +61,10 @@ struct FlowReceiver : private NetworkMessageReceiver {
endpoint.token = token;
FlowTransport::transport().addWellKnownEndpoint(endpoint, this, taskID);
}
private:
Endpoint endpoint;
bool m_isLocalEndpoint;
};
template <class T>
@ -151,7 +152,7 @@ template <class Ar, class T>
void save(Ar& ar, const ReplyPromise<T>& value) {
auto const& ep = value.getEndpoint();
ar << ep;
ASSERT(!ep.address[0].isValid() || ep.address[0].isPublic()); // No re-serializing non-public addresses (the reply connection won't be available to any other process)
ASSERT(!ep.getPrimaryAddress().isValid() || ep.getPrimaryAddress().isPublic()); // No re-serializing non-public addresses (the reply connection won't be available to any other process)
}
template <class Ar, class T>
@ -357,7 +358,7 @@ template <class Ar, class T>
void save(Ar& ar, const RequestStream<T>& value) {
auto const& ep = value.getEndpoint();
ar << ep;
UNSTOPPABLE_ASSERT(ep.address[0].isValid()); // No serializing PromiseStreams on a client with no public address
UNSTOPPABLE_ASSERT(ep.getPrimaryAddress().isValid()); // No serializing PromiseStreams on a client with no public address
}
template <class Ar, class T>

View File

@ -139,7 +139,7 @@ public:
return m;
}
ProcessInfo* getProcess( Endpoint const& endpoint ) { return getProcessByAddress(endpoint.address[0]); }
ProcessInfo* getProcess( Endpoint const& endpoint ) { return getProcessByAddress(endpoint.getPrimaryAddress()); }
ProcessInfo* getCurrentProcess() { return currentProcess; }
virtual Future<Void> onProcess( ISimulator::ProcessInfo *process, int taskID = -1 ) = 0;
virtual Future<Void> onMachine( ISimulator::ProcessInfo *process, int taskID = -1 ) = 0;

View File

@ -1156,7 +1156,7 @@ ACTOR Future<Void> clusterGetServerInfo(
ReplyPromise<ServerDBInfo> reply)
{
state UID issueID;
addIssue( db->workersWithIssues, reply.getEndpoint().address[0], issues, issueID );
addIssue( db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID );
for(auto it : incompatiblePeers) {
db->incompatibleConnections[it] = now() + SERVER_KNOBS->INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
}
@ -1168,7 +1168,7 @@ ACTOR Future<Void> clusterGetServerInfo(
}
}
removeIssue( db->workersWithIssues, reply.getEndpoint().address[0], issues, issueID );
removeIssue( db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID );
reply.send( db->serverInfo->get() );
return Void();
@ -1184,13 +1184,13 @@ ACTOR Future<Void> clusterOpenDatabase(
{
// NOTE: The client no longer expects this function to return errors
state UID issueID;
addIssue( db->clientsWithIssues, reply.getEndpoint().address[0], issues, issueID );
addIssue( db->clientsWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID );
if(supportedVersions.size() > 0) {
db->clientVersionMap[reply.getEndpoint().address[0]] = supportedVersions;
db->clientVersionMap[reply.getEndpoint().getPrimaryAddress()] = supportedVersions;
}
db->traceLogGroupMap[reply.getEndpoint().address[0]] = traceLogGroup.toString();
db->traceLogGroupMap[reply.getEndpoint().getPrimaryAddress()] = traceLogGroup.toString();
while (db->clientInfo->get().id == knownClientInfoID) {
choose {
@ -1199,9 +1199,9 @@ ACTOR Future<Void> clusterOpenDatabase(
}
}
removeIssue( db->clientsWithIssues, reply.getEndpoint().address[0], issues, issueID );
db->clientVersionMap.erase(reply.getEndpoint().address[0]);
db->traceLogGroupMap.erase(reply.getEndpoint().address[0]);
removeIssue( db->clientsWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID );
db->clientVersionMap.erase(reply.getEndpoint().getPrimaryAddress());
db->traceLogGroupMap.erase(reply.getEndpoint().getPrimaryAddress());
reply.send( db->clientInfo->get() );
return Void();
@ -1371,7 +1371,7 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData::
when ( FailureMonitoringRequest req = waitNext( requests ) ) {
if ( req.senderStatus.present() ) {
// Update the status of requester, if necessary
auto& address = req.reply.getEndpoint().address[0];
auto& address = req.reply.getEndpoint().getPrimaryAddress();
auto& stat = currentStatus[ address ];
auto& newStat = req.senderStatus.get();

View File

@ -128,7 +128,7 @@ ACTOR Future<Void> localGenerationReg( GenerationRegInterface interf, OnDemandSt
// SOMEDAY: concurrent access to different keys?
loop choose {
when ( GenerationRegReadRequest _req = waitNext( interf.read.getFuture() ) ) {
TraceEvent("GenerationRegReadRequest").detail("From", _req.reply.getEndpoint().address[0]).detail("K", printable(_req.key));
TraceEvent("GenerationRegReadRequest").detail("From", _req.reply.getEndpoint().getPrimaryAddress()).detail("K", printable(_req.key));
state GenerationRegReadRequest req = _req;
Optional<Value> rawV = wait( store->readValue( req.key ) );
v = rawV.present() ? BinaryReader::fromStringRef<GenerationRegVal>( rawV.get(), IncludeVersion() ) : GenerationRegVal();
@ -149,11 +149,11 @@ ACTOR Future<Void> localGenerationReg( GenerationRegInterface interf, OnDemandSt
v.val = wrq.kv.value;
store->set( KeyValueRef( wrq.kv.key, BinaryWriter::toValue(v, IncludeVersion()) ) );
wait(store->commit());
TraceEvent("GenerationRegWrote").detail("From", wrq.reply.getEndpoint().address[0]).detail("Key", printable(wrq.kv.key))
TraceEvent("GenerationRegWrote").detail("From", wrq.reply.getEndpoint().getPrimaryAddress()).detail("Key", printable(wrq.kv.key))
.detail("ReqGen", wrq.gen.generation).detail("Returning", v.writeGen.generation);
wrq.reply.send( v.writeGen );
} else {
TraceEvent("GenerationRegWriteFail").detail("From", wrq.reply.getEndpoint().address[0]).detail("Key", printable(wrq.kv.key))
TraceEvent("GenerationRegWriteFail").detail("From", wrq.reply.getEndpoint().getPrimaryAddress()).detail("Key", printable(wrq.kv.key))
.detail("ReqGen", wrq.gen.generation).detail("ReadGen", v.readGen.generation).detail("WriteGen", v.writeGen.generation);
wrq.reply.send( std::max( v.readGen, v.writeGen ) );
}
@ -449,7 +449,7 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder) {
state GenerationRegInterface myInterface( g_network );
state OnDemandStore store( dataFolder, myID );
TraceEvent("CoordinationServer", myID).detail("MyInterfaceAddr", myInterface.read.getEndpoint().address[0]).detail("Folder", dataFolder);
TraceEvent("CoordinationServer", myID).detail("MyInterfaceAddr", myInterface.read.getEndpoint().getPrimaryAddress()).detail("Folder", dataFolder);
try {
wait( localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store) || store.getError() );

View File

@ -1814,7 +1814,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
allServers.push_back( newServer.id() );
TraceEvent("AddedStorageServer", masterId).detail("ServerID", newServer.id()).detail("ProcessClass", processClass.toString()).detail("WaitFailureToken", newServer.waitFailure.getEndpoint().token).detail("Address", newServer.waitFailure.getEndpoint().address[0]);
TraceEvent("AddedStorageServer", masterId).detail("ServerID", newServer.id()).detail("ProcessClass", processClass.toString()).detail("WaitFailureToken", newServer.waitFailure.getEndpoint().token).detail("Address", newServer.waitFailure.getEndpoint().getPrimaryAddress());
auto &r = server_info[newServer.id()] = Reference<TCServerInfo>( new TCServerInfo( newServer, processClass, includedDCs.empty() || std::find(includedDCs.begin(), includedDCs.end(), newServer.locality.dcId()) != includedDCs.end(), storageServerSet ) );
// Establish the relation between server and machine
@ -2606,7 +2606,7 @@ ACTOR Future<Void> storageServerTracker(
return Void();
}
when( std::pair<StorageServerInterface, ProcessClass> newInterface = wait( interfaceChanged ) ) {
bool restartRecruiting = newInterface.first.waitFailure.getEndpoint().address[0] != server->lastKnownInterface.waitFailure.getEndpoint().address[0];
bool restartRecruiting = newInterface.first.waitFailure.getEndpoint().getPrimaryAddress() != server->lastKnownInterface.waitFailure.getEndpoint().getPrimaryAddress();
bool localityChanged = server->lastKnownInterface.locality != newInterface.first.locality;
bool machineLocalityChanged = server->lastKnownInterface.locality.zoneId().get() !=
newInterface.first.locality.zoneId().get();

View File

@ -199,11 +199,11 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
when ( wait( delay(SERVER_KNOBS->POLLING_FREQUENCY) ) ) {
for(int i = 0; i < coordinators.leaderElectionServers.size(); ++i) {
if(true_heartbeats[i].isReady())
TraceEvent("LeaderTrueHeartbeat", myInfo.changeID).detail("Coordinator", coordinators.leaderElectionServers[i].candidacy.getEndpoint().address[0]);
TraceEvent("LeaderTrueHeartbeat", myInfo.changeID).detail("Coordinator", coordinators.leaderElectionServers[i].candidacy.getEndpoint().getPrimaryAddress());
else if(false_heartbeats[i].isReady())
TraceEvent("LeaderFalseHeartbeat", myInfo.changeID).detail("Coordinator", coordinators.leaderElectionServers[i].candidacy.getEndpoint().address[0]);
TraceEvent("LeaderFalseHeartbeat", myInfo.changeID).detail("Coordinator", coordinators.leaderElectionServers[i].candidacy.getEndpoint().getPrimaryAddress());
else
TraceEvent("LeaderNoHeartbeat", myInfo.changeID).detail("Coordinator", coordinators.leaderElectionServers[i].candidacy.getEndpoint().address[0]);
TraceEvent("LeaderNoHeartbeat", myInfo.changeID).detail("Coordinator", coordinators.leaderElectionServers[i].candidacy.getEndpoint().getPrimaryAddress());
}
TraceEvent("ReleasingLeadership", myInfo.changeID);
break;

View File

@ -284,7 +284,7 @@ Version poppedVersion( LogRouterData* self, Tag tag) {
ACTOR Future<Void> logRouterPeekMessages( LogRouterData* self, TLogPeekRequest req ) {
state BinaryWriter messages(Unversioned());
//TraceEvent("LogRouterPeek1", self->dbgid).detail("From", req.reply.getEndpoint().address[0]).detail("Ver", self->version.get()).detail("Begin", req.begin);
//TraceEvent("LogRouterPeek1", self->dbgid).detail("From", req.reply.getEndpoint().getPrimaryAddress()).detail("Ver", self->version.get()).detail("Begin", req.begin);
if( req.returnIfBlocked && self->version.get() < req.begin ) {
//TraceEvent("LogRouterPeek2", self->dbgid);
req.reply.sendError(end_of_stream());

View File

@ -37,7 +37,7 @@ struct MasterInterface {
RequestStream< struct ChangeCoordinatorsRequest > changeCoordinators;
RequestStream< struct GetCommitVersionRequest > getCommitVersion;
NetworkAddress address() const { return changeCoordinators.getEndpoint().address[0]; }
NetworkAddress address() const { return changeCoordinators.getEndpoint().getPrimaryAddress(); }
UID id() const { return changeCoordinators.getEndpoint().token; }
template <class Archive>

View File

@ -921,7 +921,7 @@ namespace oldTLog {
persistTagMessagesKey(logData->logId, oldTag, req.begin),
persistTagMessagesKey(logData->logId, oldTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address[0]).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? printable(kv1[0].key) : "").detail("Tag2ResultsLast", kv2.size() ? printable(kv2[0].key) : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? printable(kv1[0].key) : "").detail("Tag2ResultsLast", kv2.size() ? printable(kv2[0].key) : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
for (auto &kv : kvs) {
auto ver = decodeTagMessagesKey(kv.key);
@ -945,7 +945,7 @@ namespace oldTLog {
messages.serializeBytes( messages2.toStringRef() );
} else {
peekMessagesFromMemory( logData, req, messages, endVersion );
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address[0]).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
}
Version poppedVer = poppedVersion(logData, oldTag);
@ -960,7 +960,7 @@ namespace oldTLog {
reply.messages = messages.toStringRef();
reply.end = endVersion;
}
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address[0]);
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());
if(req.sequence.present()) {
auto& trackerData = self->peekTracker[peekId];

View File

@ -75,7 +75,7 @@ ACTOR Future<Void> resolveBatch(
state Optional<UID> debugID;
// The first request (prevVersion < 0) comes from the master
state NetworkAddress proxyAddress = req.prevVersion >= 0 ? req.reply.getEndpoint().address[0] : NetworkAddress();
state NetworkAddress proxyAddress = req.prevVersion >= 0 ? req.reply.getEndpoint().getPrimaryAddress() : NetworkAddress();
state ProxyRequestsInfo &proxyInfo = self->proxyInfoMap[proxyAddress];
if(req.debugID.present()) {

View File

@ -41,7 +41,7 @@ struct ResolverInterface {
std::string toString() const { return id().shortString(); }
bool operator == ( ResolverInterface const& r ) const { return id() == r.id(); }
bool operator != ( ResolverInterface const& r ) const { return id() != r.id(); }
NetworkAddress address() const { return resolve.getEndpoint().address[0]; }
NetworkAddress address() const { return resolve.getEndpoint().getPrimaryAddress(); }
void initEndpoints() {
metrics.getEndpoint( TaskResolutionMetrics );
split.getEndpoint( TaskResolutionMetrics );

View File

@ -33,7 +33,7 @@ struct RestoreInterface {
bool operator == (RestoreInterface const& r) const { return id() == r.id(); }
bool operator != (RestoreInterface const& r) const { return id() != r.id(); }
UID id() const { return test.getEndpoint().token; }
NetworkAddress address() const { return test.getEndpoint().address[0]; }
NetworkAddress address() const { return test.getEndpoint().getPrimaryAddress(); }
void initEndpoints() {
test.getEndpoint( TaskClusterController );

View File

@ -53,7 +53,7 @@ struct TLogInterface {
UID getSharedTLogID() const { return sharedTLogID; }
std::string toString() const { return id().shortString(); }
bool operator == ( TLogInterface const& r ) const { return id() == r.id(); }
NetworkAddress address() const { return peekMessages.getEndpoint().address[0]; }
NetworkAddress address() const { return peekMessages.getEndpoint().getPrimaryAddress(); }
void initEndpoints() {
getQueuingMetrics.getEndpoint( TaskTLogQueuingMetrics );
popMessages.getEndpoint( TaskTLogPop );

View File

@ -1036,7 +1036,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
persistTagMessagesKey(logData->logId, req.tag, req.begin),
persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address[0]).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? printable(kv1[0].key) : "").detail("Tag2ResultsLast", kv2.size() ? printable(kv2[0].key) : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? printable(kv1[0].key) : "").detail("Tag2ResultsLast", kv2.size() ? printable(kv2[0].key) : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
for (auto &kv : kvs) {
auto ver = decodeTagMessagesKey(kv.key);
@ -1050,7 +1050,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
messages.serializeBytes( messages2.toStringRef() );
} else {
peekMessagesFromMemory( logData, req, messages, endVersion );
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address[0]).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
}
TLogPeekReply reply;
@ -1059,7 +1059,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
reply.messages = messages.toStringRef();
reply.end = endVersion;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address[0]);
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());
if(req.sequence.present()) {
auto& trackerData = self->peekTracker[peekId];

View File

@ -2075,14 +2075,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
if ( pos != -1 ) {
TraceEvent("TLogJoinedMe", dbgid).detail("TLog", req.myInterface.id()).detail("Address", req.myInterface.commit.getEndpoint().address[0].toString());
TraceEvent("TLogJoinedMe", dbgid).detail("TLog", req.myInterface.id()).detail("Address", req.myInterface.commit.getEndpoint().getPrimaryAddress().toString());
if( !logServers[pos]->get().present() || req.myInterface.commit.getEndpoint() != logServers[pos]->get().interf().commit.getEndpoint())
logServers[pos]->setUnconditional( OptionalInterface<TLogInterface>(req.myInterface) );
lastReply[req.myInterface.id()].send(false);
lastReply[req.myInterface.id()] = req.reply;
}
else {
TraceEvent("TLogJoinedMeUnknown", dbgid).detail("TLog", req.myInterface.id()).detail("Address", req.myInterface.commit.getEndpoint().address[0].toString());
TraceEvent("TLogJoinedMeUnknown", dbgid).detail("TLog", req.myInterface.id()).detail("Address", req.myInterface.commit.getEndpoint().getPrimaryAddress().toString());
req.reply.send(true);
}
}

View File

@ -55,7 +55,7 @@ struct WorkerInterface {
TesterInterface testerInterface;
UID id() const { return tLog.getEndpoint().token; }
NetworkAddress address() const { return tLog.getEndpoint().address[0]; }
NetworkAddress address() const { return tLog.getEndpoint().getPrimaryAddress(); }
WorkerInterface() {}
WorkerInterface( LocalityData locality ) : locality( locality ) {}

View File

@ -1456,15 +1456,21 @@ int main(int argc, char* argv[]) {
tlsOptions->register_network();
#endif
if (role == FDBD || role == NetworkTestServer || role == Restore) {
try {
listenErrors.push_back(FlowTransport::transport().bind(publicAddresses[0], listenAddresses[0]));
if (listenErrors[0].isReady()) listenErrors[0].get();
} catch (Error& e) {
TraceEvent("BindError").error(e);
fprintf(stderr, "Error initializing networking with public address %s and listen address %s (%s)\n", publicAddresses[0].toString().c_str(), listenAddresses[0].toString().c_str(), e.what());
printHelpTeaser(argv[0]);
flushAndExit(FDB_EXIT_ERROR);
if (role == FDBD || role == NetworkTestServer) {
for (int ii = 0; ii < publicAddresses.size(); ++ii) {
const NetworkAddress& publicAddress = publicAddresses[ii];
const NetworkAddress& listenAddress = listenAddresses[ii];
try {
const Future<Void>& errorF = FlowTransport::transport().bind(publicAddress, listenAddress);
listenErrors.push_back(errorF);
if (errorF.isReady()) errorF.get();
} catch (Error& e) {
TraceEvent("BindError").error(e);
fprintf(stderr, "Error initializing networking with public address %s and listen address %s (%s)\n",
publicAddress.toString().c_str(), listenAddress.toString().c_str(), e.what());
printHelpTeaser(argv[0]);
flushAndExit(FDB_EXIT_ERROR);
}
}
}

View File

@ -704,7 +704,7 @@ ACTOR Future<DistributedTestResults> runWorkload( Database cx, std::vector< Test
TraceEvent(SevError, "TestFailure")
.error(metricTasks[i].getError())
.detail("Reason", "Metrics not retrieved")
.detail("From", workloads[i].metrics.getEndpoint().address[0]);
.detail("From", workloads[i].metrics.getEndpoint().getPrimaryAddress());
}
}

View File

@ -511,7 +511,7 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
}
when( wait( ccInterface->onChange() ) ) {
if(ccInterface->get().present())
TraceEvent("GotCCInterfaceChange").detail("CCID", ccInterface->get().get().id()).detail("CCMachine", ccInterface->get().get().getWorkers.getEndpoint().address[0]);
TraceEvent("GotCCInterfaceChange").detail("CCID", ccInterface->get().get().id()).detail("CCMachine", ccInterface->get().get().getWorkers.getEndpoint().getPrimaryAddress());
}
}
}

View File

@ -150,7 +150,7 @@ struct PingWorkload : TestWorkload {
loop {
wait( poisson( &lastTime, self->actorCount / self->operationsPerSecond ) );
auto& peer = g_random->randomChoice(peers);
state NetworkAddress addr = peer.getEndpoint().address[0];
state NetworkAddress addr = peer.getEndpoint().getPrimaryAddress();
state double before = now();
LoadedPingRequest req;
@ -251,7 +251,7 @@ struct PingWorkload : TestWorkload {
req.payload = self->payloadOut;
req.loadReply = true;
replies.push_back( success( peers[i].getReply( req ) ) );
// replies.push_back( self->receptionLogger( self, peers[i].payloadPing.getReply( req ), peers[i].payloadPing.getEndpoint().address[0], pingId ) );
// replies.push_back( self->receptionLogger( self, peers[i].payloadPing.getReply( req ), peers[i].payloadPing.getEndpoint().getPrimaryAddress(), pingId ) );
// peers[i].payloadPing.send( req );
// replies.push_back( self->payloadDelayer( req, peers[i].payloadPing ) );
}

View File

@ -65,8 +65,8 @@ struct TargetedKillWorkload : TestWorkload {
int killed = 0;
for( int i = 0; i < workers.size(); i++ ) {
if( workers[i].first.master.getEndpoint().address[0] == address ||
( self->killAllMachineProcesses && workers[i].first.master.getEndpoint().address[0].ip == address.ip && workers[i].second != ProcessClass::TesterClass ) ) {
if( workers[i].first.master.getEndpoint().getPrimaryAddress() == address ||
( self->killAllMachineProcesses && workers[i].first.master.getEndpoint().getPrimaryAddress().ip == address.ip && workers[i].second != ProcessClass::TesterClass ) ) {
TraceEvent("WorkerKill").detail("TargetedMachine", address).detail("Worker", workers[i].first.id());
workers[i].first.clientInterface.reboot.send( RebootRequest() );
}
@ -94,7 +94,7 @@ struct TargetedKillWorkload : TestWorkload {
for( int i = 0; i < proxies->size(); i++) {
MasterProxyInterface mpi = proxies->getInterface(o);
machine = mpi.address();
if(machine != self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().address[0])
if(machine != self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().getPrimaryAddress())
break;
o = ++o%proxies->size();
}
@ -105,7 +105,7 @@ struct TargetedKillWorkload : TestWorkload {
for( int i = 0; i < tlogs.size(); i++) {
TLogInterface tli = tlogs[o];
machine = tli.address();
if(machine != self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().address[0])
if(machine != self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().getPrimaryAddress())
break;
o = ++o%tlogs.size();
}
@ -115,13 +115,13 @@ struct TargetedKillWorkload : TestWorkload {
for( int i = 0; i < storageServers.size(); i++) {
StorageServerInterface ssi = storageServers[o];
machine = ssi.address();
if(machine != self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().address[0])
if(machine != self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().getPrimaryAddress())
break;
o = ++o%storageServers.size();
}
}
else if( self->machineToKill == "clustercontroller" || self->machineToKill == "cc" ) {
machine = self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().address[0];
machine = self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().getPrimaryAddress();
}
TraceEvent("IsolatedMark").detail("TargetedMachine", machine).detail("Role", self->machineToKill);