diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index b0d0a1ca43..e4a3ae1ca1 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -365,7 +365,10 @@ ACTOR Future pingLatencyLogger(TransportData* self) { lastAddress = *it; auto peer = self->getPeer(lastAddress); if (!peer) { - TraceEvent(SevWarnAlways, "MissingNetworkAddress").suppressFor(10.0).detail("PeerAddr", lastAddress); + TraceEvent(SevWarnAlways, "MissingNetworkAddress") + .suppressFor(10.0) + .detail("PeerAddr", lastAddress) + .detail("PeerAddress", lastAddress); } if (peer->lastLoggedTime <= 0.0) { peer->lastLoggedTime = peer->lastConnectTime; @@ -376,6 +379,7 @@ ACTOR Future pingLatencyLogger(TransportData* self) { TraceEvent("PingLatency") .detail("Elapsed", now() - peer->lastLoggedTime) .detail("PeerAddr", lastAddress) + .detail("PeerAddress", lastAddress) .detail("MinLatency", peer->pingLatencies.min()) .detail("MaxLatency", peer->pingLatencies.max()) .detail("MeanLatency", peer->pingLatencies.mean()) @@ -649,6 +653,7 @@ ACTOR Future connectionKeeper(Reference self, Future reader = Void()) { TraceEvent(SevDebug, "ConnectionKeeper", conn ? conn->getDebugID() : UID()) .detail("PeerAddr", self->destination) + .detail("PeerAddress", self->destination) .detail("ConnSet", (bool)conn); ASSERT_WE_THINK(FlowTransport::transport().getLocalAddress() != self->destination); @@ -691,6 +696,7 @@ ACTOR Future connectionKeeper(Reference self, TraceEvent("ConnectingTo", conn ? conn->getDebugID() : UID()) .suppressFor(1.0) .detail("PeerAddr", self->destination) + .detail("PeerAddress", self->destination) .detail("PeerReferences", self->peerReferences) .detail("FailureStatus", IFailureMonitor::failureMonitor().getState(self->destination).isAvailable() ? "OK" @@ -722,7 +728,8 @@ ACTOR Future connectionKeeper(Reference self, TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID()) .suppressFor(1.0) - .detail("PeerAddr", self->destination); + .detail("PeerAddr", self->destination) + .detail("PeerAddress", self->destination); self->prependConnectPacket(); reader = connectionReader(self->transport, conn, self, Promise>()); } @@ -737,7 +744,8 @@ ACTOR Future connectionKeeper(Reference self, } TraceEvent("ConnectionTimedOut", conn ? conn->getDebugID() : UID()) .suppressFor(1.0) - .detail("PeerAddr", self->destination); + .detail("PeerAddr", self->destination) + .detail("PeerAddress", self->destination); throw; } @@ -756,7 +764,8 @@ ACTOR Future connectionKeeper(Reference self, self->resetConnection.onTrigger()); TraceEvent("ConnectionReset", conn ? conn->getDebugID() : UID()) .suppressFor(1.0) - .detail("PeerAddr", self->destination); + .detail("PeerAddr", self->destination) + .detail("PeerAddress", self->destination); throw connection_failed(); } catch (Error& e) { if (e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || @@ -782,7 +791,8 @@ ACTOR Future connectionKeeper(Reference self, if (now() - firstConnFailedTime.get() > FLOW_KNOBS->PEER_UNAVAILABLE_FOR_LONG_TIME_TIMEOUT) { TraceEvent(SevWarnAlways, "PeerUnavailableForLongTime", conn ? conn->getDebugID() : UID()) .suppressFor(1.0) - .detail("PeerAddr", self->destination); + .detail("PeerAddr", self->destination) + .detail("PeerAddress", self->destination); firstConnFailedTime = now() - FLOW_KNOBS->PEER_UNAVAILABLE_FOR_LONG_TIME_TIMEOUT / 2.0; } } else { @@ -811,13 +821,15 @@ ACTOR Future connectionKeeper(Reference self, TraceEvent(ok ? SevInfo : SevWarnAlways, "ConnectionClosed", conn ? conn->getDebugID() : UID()) .errorUnsuppressed(e) .suppressFor(1.0) - .detail("PeerAddr", self->destination); + .detail("PeerAddr", self->destination) + .detail("PeerAddress", self->destination); } else { TraceEvent( ok ? SevInfo : SevWarnAlways, "IncompatibleConnectionClosed", conn ? conn->getDebugID() : UID()) .errorUnsuppressed(e) .suppressFor(1.0) - .detail("PeerAddr", self->destination); + .detail("PeerAddr", self->destination) + .detail("PeerAddress", self->destination); // Since the connection has closed, we need to check the protocol version the next time we connect self->compatible = true; @@ -832,7 +844,8 @@ ACTOR Future connectionKeeper(Reference self, } else if (now() - it.first > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT) { TraceEvent(SevWarnAlways, "TooManyConnectionsClosed", conn ? conn->getDebugID() : UID()) .suppressFor(5.0) - .detail("PeerAddr", self->destination); + .detail("PeerAddr", self->destination) + .detail("PeerAddress", self->destination); self->transport->degraded->set(true); } it.second = now(); @@ -877,7 +890,11 @@ ACTOR Future connectionKeeper(Reference self, if (self->peerReferences <= 0 && self->reliable.empty() && self->unsent.empty() && self->outstandingReplies == 0) { - TraceEvent("PeerDestroy").errorUnsuppressed(e).suppressFor(1.0).detail("PeerAddr", self->destination); + TraceEvent("PeerDestroy") + .errorUnsuppressed(e) + .suppressFor(1.0) + .detail("PeerAddr", self->destination) + .detail("PeerAddress", self->destination); self->connect.cancel(); self->transport->peers.erase(self->destination); self->transport->orderedAddresses.erase(self->destination); @@ -1063,7 +1080,8 @@ ACTOR static void deliver(TransportData* self, TraceEvent(SevError, "ReceiverError") .error(e) .detail("Token", destination.token.toString()) - .detail("Peer", destination.getPrimaryAddress()); + .detail("Peer", destination.getPrimaryAddress()) + .detail("PeerAddress", destination.getPrimaryAddress()); if (!FlowTransport::isClient()) { flushAndExit(FDB_EXIT_ERROR); } @@ -1360,6 +1378,10 @@ ACTOR static Future connectionReader(TransportData* transport, pkt.canonicalRemotePort ? NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort) : conn->getPeerAddress()) + .detail("PeerAddress", + pkt.canonicalRemotePort + ? NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort) + : conn->getPeerAddress()) .detail("ConnectionId", connectionId); transport->lastIncompatibleMessage = now(); } @@ -1385,6 +1407,7 @@ ACTOR static Future connectionReader(TransportData* transport, TraceEvent("ConnectionEstablished", conn->getDebugID()) .suppressFor(1.0) .detail("Peer", conn->getPeerAddress()) + .detail("PeerAddress", conn->getPeerAddress()) .detail("ConnectionId", connectionId); } @@ -1400,7 +1423,9 @@ ACTOR static Future connectionReader(TransportData* transport, // Outgoing connection; port information should be what we expect TraceEvent("ConnectedOutgoing") .suppressFor(1.0) - .detail("PeerAddr", NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort)); + .detail("PeerAddr", NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort)) + .detail("PeerAddress", + NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort)); peer->compatible = compatible; if (!compatible) { peer->transport->numIncompatibleConnections++; diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index 0576c9f240..5166b75cde 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -361,7 +361,8 @@ struct Sim2Conn final : IConnection, ReferenceCounted { TraceEvent("SimulatedDisconnection") .detail("Phase", "Connect") .detail("Address", process->address) - .detail("Peer", peerProcess->address); + .detail("Peer", peerProcess->address) + .detail("PeerAddress", peerProcess->address); throw connection_failed(); } @@ -583,6 +584,7 @@ private: TraceEvent("ConnectionFailure", dbgid) .detail("MyAddr", process->address) .detail("PeerAddr", peerProcess->address) + .detail("PeerAddress", peerProcess->address) .detail("PeerIsValid", peer.isValid()) .detail("SendClosed", a > .33) .detail("RecvClosed", a < .66) @@ -616,6 +618,7 @@ private: .detail("MyAddr", self->process->address) .detail("IsPublic", self->process->address.isPublic()) .detail("PeerAddr", self->peerEndpoint) + .detail("PeerAddress", self->peerEndpoint) .detail("PeerId", self->peerId) .detail("Opened", self->opened); return Void(); diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index 1f9a3bef88..894b403a76 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -1031,6 +1031,7 @@ ACTOR Future getResolution(CommitBatchContext* self) { for (int r = 0; r < self->pProxyCommitData->resolvers.size(); r++) { TraceEvent(SevWarnAlways, "ResetResolverNetwork", self->pProxyCommitData->dbgid) .detail("PeerAddr", self->pProxyCommitData->resolvers[r].address()) + .detail("PeerAddress", self->pProxyCommitData->resolvers[r].address()) .detail("CurrentBatch", self->localBatchNumber) .detail("InProcessBatch", self->pProxyCommitData->latestLocalCommitBatchLogging.get()); FlowTransport::transport().resetConnection(self->pProxyCommitData->resolvers[r].address()); diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index e34a861069..6e66455bc3 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -1182,7 +1182,8 @@ Future sendSnapReq(RequestStream stream, Req req, Error e) { TraceEvent("SnapDataDistributor_ReqError") .errorUnsuppressed(reply.getError()) .detail("ConvertedErrorType", e.what()) - .detail("Peer", stream.getEndpoint().getPrimaryAddress()); + .detail("Peer", stream.getEndpoint().getPrimaryAddress()) + .detail("PeerAddress", stream.getEndpoint().getPrimaryAddress()); throw e; } return Void(); @@ -1197,6 +1198,7 @@ ACTOR Future> trySendSnapReq(RequestStream stre TraceEvent("SnapDataDistributor_ReqError") .errorUnsuppressed(reply.getError()) .detail("Peer", stream.getEndpoint().getPrimaryAddress()) + .detail("PeerAddress", stream.getEndpoint().getPrimaryAddress()) .detail("Retry", snapReqRetry); if (reply.getError().code() != error_code_request_maybe_delivered || ++snapReqRetry > SERVER_KNOBS->SNAP_NETWORK_FAILURE_RETRY_LIMIT) diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index 85b0d5c283..6e307bcd08 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -703,7 +703,8 @@ ACTOR Future logRouterPeekStream(LogRouterData* self, TLogPeekStreamReques TraceEvent(SevDebug, "LogRouterPeekStreamEnd", self->dbgid) .errorUnsuppressed(e) .detail("Tag", req.tag) - .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()); + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress()); if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index db251899be..82277df93c 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -43,6 +43,7 @@ ACTOR Future tryEstablishPeekStream(ILogSystem::ServerPeekCursor* self) { DebugLogTraceEvent(SevDebug, "SPC_StreamCreated", self->randomID) .detail("Tag", self->tag) .detail("PeerAddr", self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress()) + .detail("PeerAddress", self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress()) .detail("PeerToken", self->interf->get().interf().peekStreamMessages.getEndpoint().token); return Void(); } diff --git a/fdbserver/OldTLogServer_4_6.actor.cpp b/fdbserver/OldTLogServer_4_6.actor.cpp index 84e2908c67..68b5a59e0e 100644 --- a/fdbserver/OldTLogServer_4_6.actor.cpp +++ b/fdbserver/OldTLogServer_4_6.actor.cpp @@ -1143,7 +1143,8 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref self->activePeekStreams--; TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId) .errorUnsuppressed(e) - .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()); + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress()); if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index 3a3fdafb10..a2885d5eb1 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -1459,7 +1459,8 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref self->activePeekStreams--; TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId) .errorUnsuppressed(e) - .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()); + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress()); if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); diff --git a/fdbserver/OldTLogServer_6_2.actor.cpp b/fdbserver/OldTLogServer_6_2.actor.cpp index dcb9ea9794..45d9959560 100644 --- a/fdbserver/OldTLogServer_6_2.actor.cpp +++ b/fdbserver/OldTLogServer_6_2.actor.cpp @@ -1887,7 +1887,8 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref self->activePeekStreams--; TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId) .errorUnsuppressed(e) - .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()); + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress()); if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index fbccf0933a..2bb124c405 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -2186,7 +2186,8 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId) .errorUnsuppressed(e) .detail("Tag", req.tag) - .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()); + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress()); if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); diff --git a/fdbserver/include/fdbserver/ClusterController.actor.h b/fdbserver/include/fdbserver/ClusterController.actor.h index 7dece0ad5b..ae89a9cd86 100644 --- a/fdbserver/include/fdbserver/ClusterController.actor.h +++ b/fdbserver/include/fdbserver/ClusterController.actor.h @@ -3034,7 +3034,8 @@ public: TraceEvent("ClusterControllerReceivedPeerRecovering") .suppressFor(10.0) .detail("Worker", req.address) - .detail("Peer", peer); + .detail("Peer", peer) + .detail("PeerAddress", peer); health.degradedPeers.erase(peer); health.disconnectedPeers.erase(peer); } @@ -3066,7 +3067,10 @@ public: for (auto& [workerAddress, health] : workerHealth) { for (auto it = health.degradedPeers.begin(); it != health.degradedPeers.end();) { if (currentTime - it->second.lastRefreshTime > SERVER_KNOBS->CC_DEGRADED_LINK_EXPIRATION_INTERVAL) { - TraceEvent("WorkerPeerHealthRecovered").detail("Worker", workerAddress).detail("Peer", it->first); + TraceEvent("WorkerPeerHealthRecovered") + .detail("Worker", workerAddress) + .detail("Peer", it->first) + .detail("PeerAddress", it->first); health.degradedPeers.erase(it++); } else { ++it; @@ -3074,7 +3078,10 @@ public: } for (auto it = health.disconnectedPeers.begin(); it != health.disconnectedPeers.end();) { if (currentTime - it->second.lastRefreshTime > SERVER_KNOBS->CC_DEGRADED_LINK_EXPIRATION_INTERVAL) { - TraceEvent("WorkerPeerHealthRecovered").detail("Worker", workerAddress).detail("Peer", it->first); + TraceEvent("WorkerPeerHealthRecovered") + .detail("Worker", workerAddress) + .detail("Peer", it->first) + .detail("PeerAddress", it->first); health.disconnectedPeers.erase(it++); } else { ++it; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index a870befa1b..0d0e7a1fb7 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -3192,7 +3192,8 @@ ACTOR Future> getChangeFeedMutations(Stor .detail("Range", req.range) .detail("Begin", req.begin) .detail("End", req.end) - .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()); + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress()); } if (data->version.get() < req.begin) { @@ -3239,7 +3240,8 @@ ACTOR Future> getChangeFeedMutations(Stor .detail("FetchVersion", feedInfo->fetchVersion) .detail("DurableFetchVersion", feedInfo->durableFetchVersion.get()) .detail("DurableValidationVersion", durableValidationVersion) - .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()); + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress()); } if (req.end > emptyVersion + 1) { @@ -3667,7 +3669,8 @@ ACTOR Future> getChangeFeedMutations(Stor .detail("PopVersion", reply.popVersion) .detail("Count", reply.mutations.size()) .detail("GotAll", gotAll) - .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()); + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress()); } // If the SS's version advanced at all during any of the waits, the read from memory may have missed some @@ -3749,7 +3752,8 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques .detail("Begin", req.begin) .detail("End", req.end) .detail("CanReadPopped", req.canReadPopped) - .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()); + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress()); } Version checkTooOldVersion = (!req.canReadPopped || req.end == MAX_VERSION) ? req.begin : req.end; @@ -3791,7 +3795,8 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques .detail("End", req.end) .detail("CanReadPopped", req.canReadPopped) .detail("Version", req.begin - 1) - .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()); + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress()); } loop { @@ -3808,7 +3813,8 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques .detail("End", req.end) .detail("CanReadPopped", req.canReadPopped) .detail("Version", blockedVersion.present() ? blockedVersion.get() : data->prevVersion) - .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()); + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress()); } removeUID = true; } @@ -3830,7 +3836,8 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques .detail("End", req.end) .detail("CanReadPopped", req.canReadPopped) .detail("Version", blockedVersion.present() ? blockedVersion.get() : data->prevVersion) - .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()); + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress()); } } std::pair _feedReply = wait(feedReplyFuture); @@ -5967,7 +5974,7 @@ ACTOR Future mapKeyValues(StorageServer* data, pOriginalReq->options.get().debugID.get().first(), "storageserver.mapKeyValues.BeforeLoop"); - for (; offset 0; offset += SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE) { + for (; (offset < sz) && (*remainingLimitBytes > 0); offset += SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE) { // Divide into batches of MAX_PARALLEL_QUICK_GET_VALUE subqueries for (int i = 0; i + offset < sz && i < SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE; i++) { KeyValueRef* it = &input.data[i + offset]; diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 6d1683173b..2ccaee79ec 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -1171,6 +1171,7 @@ UpdateWorkerHealthRequest doPeerHealthCheck(const WorkerInterface& interf, TraceEvent(SevDebug, "PeerHealthMonitor") .detail("Peer", address) + .detail("PeerAddress", address) .detail("Force", enablePrimaryTxnSystemHealthCheck->get()) .detail("Elapsed", now() - lastLoggedTime) .detail("Disconnected", disconnectedPeer) @@ -1201,6 +1202,7 @@ UpdateWorkerHealthRequest doPeerHealthCheck(const WorkerInterface& interf, if (disconnectedPeer || degradedPeer) { TraceEvent("HealthMonitorDetectDegradedPeer") .detail("Peer", address) + .detail("PeerAddress", address) .detail("Elapsed", now() - lastLoggedTime) .detail("Disconnected", disconnectedPeer) .detail("MinLatency", peer->pingLatencies.min()) @@ -1229,6 +1231,7 @@ UpdateWorkerHealthRequest doPeerHealthCheck(const WorkerInterface& interf, if (disconnectedPeer || degradedPeer) { TraceEvent("HealthMonitorDetectDegradedPeer") .detail("Peer", address) + .detail("PeerAddress", address) .detail("Satellite", true) .detail("Elapsed", now() - lastLoggedTime) .detail("Disconnected", disconnectedPeer) @@ -1252,6 +1255,7 @@ UpdateWorkerHealthRequest doPeerHealthCheck(const WorkerInterface& interf, TraceEvent("HealthMonitorDetectDegradedPeer") .detail("WorkerLocation", workerLocation) .detail("Peer", address) + .detail("PeerAddress", address) .detail("RemoteLogRouter", true) .detail("Elapsed", now() - lastLoggedTime) .detail("Disconnected", true) @@ -1272,6 +1276,7 @@ UpdateWorkerHealthRequest doPeerHealthCheck(const WorkerInterface& interf, TraceEvent("HealthMonitorDetectDegradedPeer") .detail("WorkerLocation", workerLocation) .detail("Peer", address) + .detail("PeerAddress", address) .detail("ExtensiveConnectivityCheck", true) .detail("Elapsed", now() - lastLoggedTime) .detail("Disconnected", true) @@ -1291,7 +1296,7 @@ UpdateWorkerHealthRequest doPeerHealthCheck(const WorkerInterface& interf, } else if (degradedPeer) { req.degradedPeers.push_back(address); } else if (isDegradedPeer(lastReq, address)) { - TraceEvent("HealthMonitorDetectRecoveredPeer").detail("Peer", address); + TraceEvent("HealthMonitorDetectRecoveredPeer").detail("Peer", address).detail("PeerAddress", address); req.recoveredPeers.push_back(address); } } @@ -1315,7 +1320,10 @@ UpdateWorkerHealthRequest doPeerHealthCheck(const WorkerInterface& interf, (workerLocation == Primary && addressInDbAndPrimarySatelliteDc(address, dbInfo)) || (checkRemoteLogRouterConnectivity && (workerLocation == Primary || workerLocation == Satellite) && addressIsRemoteLogRouter(address, dbInfo))) { - TraceEvent("HealthMonitorDetectRecentClosedPeer").suppressFor(30).detail("Peer", address); + TraceEvent("HealthMonitorDetectRecentClosedPeer") + .suppressFor(30) + .detail("Peer", address) + .detail("PeerAddress", address); req.disconnectedPeers.push_back(address); } } diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 4987b80bbe..c4fba885ce 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -26,6 +26,7 @@ #include "flow/Trace.h" #include #include +#include #ifndef BOOST_SYSTEM_NO_LIB #define BOOST_SYSTEM_NO_LIB #endif @@ -342,15 +343,21 @@ class BindPromise { Promise p; std::variant errContext; UID errID; + NetworkAddress peerAddr; public: BindPromise(const char* errContext, UID errID) : errContext(errContext), errID(errID) {} BindPromise(AuditedEvent auditedEvent, UID errID) : errContext(auditedEvent), errID(errID) {} - BindPromise(BindPromise const& r) : p(r.p), errContext(r.errContext), errID(r.errID) {} - BindPromise(BindPromise&& r) noexcept : p(std::move(r.p)), errContext(r.errContext), errID(r.errID) {} + BindPromise(BindPromise const& r) : p(r.p), errContext(r.errContext), errID(r.errID), peerAddr(r.peerAddr) {} + BindPromise(BindPromise&& r) noexcept + : p(std::move(r.p)), errContext(r.errContext), errID(r.errID), peerAddr(r.peerAddr) {} Future getFuture() const { return p.getFuture(); } + NetworkAddress getPeerAddr() const { return peerAddr; } + + void setPeerAddr(const NetworkAddress& addr) { peerAddr = addr; } + void operator()(const boost::system::error_code& error, size_t bytesWritten = 0) { try { if (error) { @@ -369,6 +376,11 @@ public: if (error.value() >= (1 << 24L)) { evt.detail("WhichMeans", TLSPolicy::ErrorString(error)); } + + if (peerAddr.isValid()) { + evt.detail("PeerAddr", peerAddr); + evt.detail("PeerAddress", peerAddr); + } } p.sendError(connection_failed()); @@ -533,6 +545,7 @@ private: TraceEvent(SevWarn, "N2_CloseError", id) .suppressFor(1.0) .detail("PeerAddr", peer_address) + .detail("PeerAddress", peer_address) .detail("ErrorCode", error.value()) .detail("Message", error.message()); } @@ -541,6 +554,7 @@ private: TraceEvent(SevWarn, "N2_ReadError", id) .suppressFor(1.0) .detail("PeerAddr", peer_address) + .detail("PeerAddress", peer_address) .detail("ErrorCode", error.value()) .detail("Message", error.message()); closeSocket(); @@ -549,6 +563,7 @@ private: TraceEvent(SevWarn, "N2_WriteError", id) .suppressFor(1.0) .detail("PeerAddr", peer_address) + .detail("PeerAddress", peer_address) .detail("ErrorCode", error.value()) .detail("Message", error.message()); closeSocket(); @@ -790,6 +805,14 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { Handshake(ssl_socket& socket, ssl_socket::handshake_type type) : socket(socket), type(type) {} double getTimeEstimate() const override { return 0.001; } + std::string getPeerAddress() const { + std::ostringstream o; + boost::system::error_code ec; + auto addr = socket.lowest_layer().remote_endpoint(ec); + o << (!ec.failed() ? addr.address().to_string() : std::string_view("0.0.0.0")); + return std::move(o).str(); + } + ThreadReturnPromise done; ssl_socket& socket; ssl_socket::handshake_type type; @@ -809,6 +832,8 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { TraceEvent(SevWarn, h.type == ssl_socket::handshake_type::client ? "N2_ConnectHandshakeError"_audit : "N2_AcceptHandshakeError"_audit) + .detail("PeerAddr", h.getPeerAddress()) + .detail("PeerAddress", h.getPeerAddress()) .detail("ErrorCode", h.err.value()) .detail("ErrorMsg", h.err.message().c_str()) .detail("BackgroundThread", true); @@ -820,6 +845,8 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { TraceEvent(SevWarn, h.type == ssl_socket::handshake_type::client ? "N2_ConnectHandshakeUnknownError"_audit : "N2_AcceptHandshakeUnknownError"_audit) + .detail("PeerAddr", h.getPeerAddress()) + .detail("PeerAddress", h.getPeerAddress()) .detail("BackgroundThread", true); h.done.sendError(connection_failed()); } @@ -910,7 +937,8 @@ public: N2::g_net2->sslHandshakerPool->post(handshake); } else { // Otherwise use flow network thread - BindPromise p("N2_AcceptHandshakeError"_audit, UID()); + BindPromise p("N2_AcceptHandshakeError"_audit, self->id); + p.setPeerAddr(self->getPeerAddress()); onHandshook = p.getFuture(); self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::server, std::move(p)); } @@ -993,6 +1021,7 @@ public: } else { // Otherwise use flow network thread BindPromise p("N2_ConnectHandshakeError"_audit, self->id); + p.setPeerAddr(self->getPeerAddress()); onHandshook = p.getFuture(); self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::client, std::move(p)); } @@ -1156,6 +1185,7 @@ private: TraceEvent(SevWarn, "N2_ReadError", id) .suppressFor(1.0) .detail("PeerAddr", peer_address) + .detail("PeerAddress", peer_address) .detail("ErrorCode", error.value()) .detail("Message", error.message()); closeSocket(); @@ -1164,6 +1194,7 @@ private: TraceEvent(SevWarn, "N2_WriteError", id) .suppressFor(1.0) .detail("PeerAddr", peer_address) + .detail("PeerAddress", peer_address) .detail("ErrorCode", error.value()) .detail("Message", error.message()); closeSocket();