Add PeerAddress to all PeerAddr/Peer TraceEvent [release-7.3] (#11521)

* Add PeerAddress to all PeerAddr/Peer TraceEvent

This is to address #4846

* fixup!

* Decorate TLS handshake errors with peerAddr (#10090)

* Use connection debug ID in N2_AcceptHandshakeError

* Decorate TLS handshake errors with peerIP

* only write one value to ostream

* Add PeerAddress to all PeerAddr/Peer TraceEvent

This is to address #4846

---------

Co-authored-by: Sam Gwydir <sam.gwydir@snowflake.com>
This commit is contained in:
Xiaoge Su 2024-07-23 17:31:19 -07:00 committed by GitHub
parent 83ae9ac129
commit 08554e4f57
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 124 additions and 34 deletions

View File

@ -365,7 +365,10 @@ ACTOR Future<Void> pingLatencyLogger(TransportData* self) {
lastAddress = *it;
auto peer = self->getPeer(lastAddress);
if (!peer) {
TraceEvent(SevWarnAlways, "MissingNetworkAddress").suppressFor(10.0).detail("PeerAddr", lastAddress);
TraceEvent(SevWarnAlways, "MissingNetworkAddress")
.suppressFor(10.0)
.detail("PeerAddr", lastAddress)
.detail("PeerAddress", lastAddress);
}
if (peer->lastLoggedTime <= 0.0) {
peer->lastLoggedTime = peer->lastConnectTime;
@ -376,6 +379,7 @@ ACTOR Future<Void> pingLatencyLogger(TransportData* self) {
TraceEvent("PingLatency")
.detail("Elapsed", now() - peer->lastLoggedTime)
.detail("PeerAddr", lastAddress)
.detail("PeerAddress", lastAddress)
.detail("MinLatency", peer->pingLatencies.min())
.detail("MaxLatency", peer->pingLatencies.max())
.detail("MeanLatency", peer->pingLatencies.mean())
@ -649,6 +653,7 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
Future<Void> reader = Void()) {
TraceEvent(SevDebug, "ConnectionKeeper", conn ? conn->getDebugID() : UID())
.detail("PeerAddr", self->destination)
.detail("PeerAddress", self->destination)
.detail("ConnSet", (bool)conn);
ASSERT_WE_THINK(FlowTransport::transport().getLocalAddress() != self->destination);
@ -691,6 +696,7 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
TraceEvent("ConnectingTo", conn ? conn->getDebugID() : UID())
.suppressFor(1.0)
.detail("PeerAddr", self->destination)
.detail("PeerAddress", self->destination)
.detail("PeerReferences", self->peerReferences)
.detail("FailureStatus",
IFailureMonitor::failureMonitor().getState(self->destination).isAvailable() ? "OK"
@ -722,7 +728,8 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID())
.suppressFor(1.0)
.detail("PeerAddr", self->destination);
.detail("PeerAddr", self->destination)
.detail("PeerAddress", self->destination);
self->prependConnectPacket();
reader = connectionReader(self->transport, conn, self, Promise<Reference<Peer>>());
}
@ -737,7 +744,8 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
}
TraceEvent("ConnectionTimedOut", conn ? conn->getDebugID() : UID())
.suppressFor(1.0)
.detail("PeerAddr", self->destination);
.detail("PeerAddr", self->destination)
.detail("PeerAddress", self->destination);
throw;
}
@ -756,7 +764,8 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
self->resetConnection.onTrigger());
TraceEvent("ConnectionReset", conn ? conn->getDebugID() : UID())
.suppressFor(1.0)
.detail("PeerAddr", self->destination);
.detail("PeerAddr", self->destination)
.detail("PeerAddress", self->destination);
throw connection_failed();
} catch (Error& e) {
if (e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled ||
@ -782,7 +791,8 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
if (now() - firstConnFailedTime.get() > FLOW_KNOBS->PEER_UNAVAILABLE_FOR_LONG_TIME_TIMEOUT) {
TraceEvent(SevWarnAlways, "PeerUnavailableForLongTime", conn ? conn->getDebugID() : UID())
.suppressFor(1.0)
.detail("PeerAddr", self->destination);
.detail("PeerAddr", self->destination)
.detail("PeerAddress", self->destination);
firstConnFailedTime = now() - FLOW_KNOBS->PEER_UNAVAILABLE_FOR_LONG_TIME_TIMEOUT / 2.0;
}
} else {
@ -811,13 +821,15 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
TraceEvent(ok ? SevInfo : SevWarnAlways, "ConnectionClosed", conn ? conn->getDebugID() : UID())
.errorUnsuppressed(e)
.suppressFor(1.0)
.detail("PeerAddr", self->destination);
.detail("PeerAddr", self->destination)
.detail("PeerAddress", self->destination);
} else {
TraceEvent(
ok ? SevInfo : SevWarnAlways, "IncompatibleConnectionClosed", conn ? conn->getDebugID() : UID())
.errorUnsuppressed(e)
.suppressFor(1.0)
.detail("PeerAddr", self->destination);
.detail("PeerAddr", self->destination)
.detail("PeerAddress", self->destination);
// Since the connection has closed, we need to check the protocol version the next time we connect
self->compatible = true;
@ -832,7 +844,8 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
} else if (now() - it.first > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT) {
TraceEvent(SevWarnAlways, "TooManyConnectionsClosed", conn ? conn->getDebugID() : UID())
.suppressFor(5.0)
.detail("PeerAddr", self->destination);
.detail("PeerAddr", self->destination)
.detail("PeerAddress", self->destination);
self->transport->degraded->set(true);
}
it.second = now();
@ -877,7 +890,11 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
if (self->peerReferences <= 0 && self->reliable.empty() && self->unsent.empty() &&
self->outstandingReplies == 0) {
TraceEvent("PeerDestroy").errorUnsuppressed(e).suppressFor(1.0).detail("PeerAddr", self->destination);
TraceEvent("PeerDestroy")
.errorUnsuppressed(e)
.suppressFor(1.0)
.detail("PeerAddr", self->destination)
.detail("PeerAddress", self->destination);
self->connect.cancel();
self->transport->peers.erase(self->destination);
self->transport->orderedAddresses.erase(self->destination);
@ -1063,7 +1080,8 @@ ACTOR static void deliver(TransportData* self,
TraceEvent(SevError, "ReceiverError")
.error(e)
.detail("Token", destination.token.toString())
.detail("Peer", destination.getPrimaryAddress());
.detail("Peer", destination.getPrimaryAddress())
.detail("PeerAddress", destination.getPrimaryAddress());
if (!FlowTransport::isClient()) {
flushAndExit(FDB_EXIT_ERROR);
}
@ -1360,6 +1378,10 @@ ACTOR static Future<Void> connectionReader(TransportData* transport,
pkt.canonicalRemotePort
? NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort)
: conn->getPeerAddress())
.detail("PeerAddress",
pkt.canonicalRemotePort
? NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort)
: conn->getPeerAddress())
.detail("ConnectionId", connectionId);
transport->lastIncompatibleMessage = now();
}
@ -1385,6 +1407,7 @@ ACTOR static Future<Void> connectionReader(TransportData* transport,
TraceEvent("ConnectionEstablished", conn->getDebugID())
.suppressFor(1.0)
.detail("Peer", conn->getPeerAddress())
.detail("PeerAddress", conn->getPeerAddress())
.detail("ConnectionId", connectionId);
}
@ -1400,7 +1423,9 @@ ACTOR static Future<Void> connectionReader(TransportData* transport,
// Outgoing connection; port information should be what we expect
TraceEvent("ConnectedOutgoing")
.suppressFor(1.0)
.detail("PeerAddr", NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort));
.detail("PeerAddr", NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort))
.detail("PeerAddress",
NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort));
peer->compatible = compatible;
if (!compatible) {
peer->transport->numIncompatibleConnections++;

View File

@ -361,7 +361,8 @@ struct Sim2Conn final : IConnection, ReferenceCounted<Sim2Conn> {
TraceEvent("SimulatedDisconnection")
.detail("Phase", "Connect")
.detail("Address", process->address)
.detail("Peer", peerProcess->address);
.detail("Peer", peerProcess->address)
.detail("PeerAddress", peerProcess->address);
throw connection_failed();
}
@ -583,6 +584,7 @@ private:
TraceEvent("ConnectionFailure", dbgid)
.detail("MyAddr", process->address)
.detail("PeerAddr", peerProcess->address)
.detail("PeerAddress", peerProcess->address)
.detail("PeerIsValid", peer.isValid())
.detail("SendClosed", a > .33)
.detail("RecvClosed", a < .66)
@ -616,6 +618,7 @@ private:
.detail("MyAddr", self->process->address)
.detail("IsPublic", self->process->address.isPublic())
.detail("PeerAddr", self->peerEndpoint)
.detail("PeerAddress", self->peerEndpoint)
.detail("PeerId", self->peerId)
.detail("Opened", self->opened);
return Void();

View File

@ -1031,6 +1031,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
for (int r = 0; r < self->pProxyCommitData->resolvers.size(); r++) {
TraceEvent(SevWarnAlways, "ResetResolverNetwork", self->pProxyCommitData->dbgid)
.detail("PeerAddr", self->pProxyCommitData->resolvers[r].address())
.detail("PeerAddress", self->pProxyCommitData->resolvers[r].address())
.detail("CurrentBatch", self->localBatchNumber)
.detail("InProcessBatch", self->pProxyCommitData->latestLocalCommitBatchLogging.get());
FlowTransport::transport().resetConnection(self->pProxyCommitData->resolvers[r].address());

View File

@ -1182,7 +1182,8 @@ Future<Void> sendSnapReq(RequestStream<Req> stream, Req req, Error e) {
TraceEvent("SnapDataDistributor_ReqError")
.errorUnsuppressed(reply.getError())
.detail("ConvertedErrorType", e.what())
.detail("Peer", stream.getEndpoint().getPrimaryAddress());
.detail("Peer", stream.getEndpoint().getPrimaryAddress())
.detail("PeerAddress", stream.getEndpoint().getPrimaryAddress());
throw e;
}
return Void();
@ -1197,6 +1198,7 @@ ACTOR Future<ErrorOr<Void>> trySendSnapReq(RequestStream<WorkerSnapRequest> stre
TraceEvent("SnapDataDistributor_ReqError")
.errorUnsuppressed(reply.getError())
.detail("Peer", stream.getEndpoint().getPrimaryAddress())
.detail("PeerAddress", stream.getEndpoint().getPrimaryAddress())
.detail("Retry", snapReqRetry);
if (reply.getError().code() != error_code_request_maybe_delivered ||
++snapReqRetry > SERVER_KNOBS->SNAP_NETWORK_FAILURE_RETRY_LIMIT)

View File

@ -703,7 +703,8 @@ ACTOR Future<Void> logRouterPeekStream(LogRouterData* self, TLogPeekStreamReques
TraceEvent(SevDebug, "LogRouterPeekStreamEnd", self->dbgid)
.errorUnsuppressed(e)
.detail("Tag", req.tag)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress());
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);

View File

@ -43,6 +43,7 @@ ACTOR Future<Void> tryEstablishPeekStream(ILogSystem::ServerPeekCursor* self) {
DebugLogTraceEvent(SevDebug, "SPC_StreamCreated", self->randomID)
.detail("Tag", self->tag)
.detail("PeerAddr", self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress())
.detail("PeerAddress", self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress())
.detail("PeerToken", self->interf->get().interf().peekStreamMessages.getEndpoint().token);
return Void();
}

View File

@ -1143,7 +1143,8 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId)
.errorUnsuppressed(e)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress());
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);

View File

@ -1459,7 +1459,8 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId)
.errorUnsuppressed(e)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress());
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);

View File

@ -1887,7 +1887,8 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId)
.errorUnsuppressed(e)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress());
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);

View File

@ -2186,7 +2186,8 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId)
.errorUnsuppressed(e)
.detail("Tag", req.tag)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress());
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);

View File

@ -3034,7 +3034,8 @@ public:
TraceEvent("ClusterControllerReceivedPeerRecovering")
.suppressFor(10.0)
.detail("Worker", req.address)
.detail("Peer", peer);
.detail("Peer", peer)
.detail("PeerAddress", peer);
health.degradedPeers.erase(peer);
health.disconnectedPeers.erase(peer);
}
@ -3066,7 +3067,10 @@ public:
for (auto& [workerAddress, health] : workerHealth) {
for (auto it = health.degradedPeers.begin(); it != health.degradedPeers.end();) {
if (currentTime - it->second.lastRefreshTime > SERVER_KNOBS->CC_DEGRADED_LINK_EXPIRATION_INTERVAL) {
TraceEvent("WorkerPeerHealthRecovered").detail("Worker", workerAddress).detail("Peer", it->first);
TraceEvent("WorkerPeerHealthRecovered")
.detail("Worker", workerAddress)
.detail("Peer", it->first)
.detail("PeerAddress", it->first);
health.degradedPeers.erase(it++);
} else {
++it;
@ -3074,7 +3078,10 @@ public:
}
for (auto it = health.disconnectedPeers.begin(); it != health.disconnectedPeers.end();) {
if (currentTime - it->second.lastRefreshTime > SERVER_KNOBS->CC_DEGRADED_LINK_EXPIRATION_INTERVAL) {
TraceEvent("WorkerPeerHealthRecovered").detail("Worker", workerAddress).detail("Peer", it->first);
TraceEvent("WorkerPeerHealthRecovered")
.detail("Worker", workerAddress)
.detail("Peer", it->first)
.detail("PeerAddress", it->first);
health.disconnectedPeers.erase(it++);
} else {
++it;

View File

@ -3192,7 +3192,8 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
.detail("Range", req.range)
.detail("Begin", req.begin)
.detail("End", req.end)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress());
}
if (data->version.get() < req.begin) {
@ -3239,7 +3240,8 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
.detail("FetchVersion", feedInfo->fetchVersion)
.detail("DurableFetchVersion", feedInfo->durableFetchVersion.get())
.detail("DurableValidationVersion", durableValidationVersion)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress());
}
if (req.end > emptyVersion + 1) {
@ -3667,7 +3669,8 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
.detail("PopVersion", reply.popVersion)
.detail("Count", reply.mutations.size())
.detail("GotAll", gotAll)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress());
}
// If the SS's version advanced at all during any of the waits, the read from memory may have missed some
@ -3749,7 +3752,8 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
.detail("Begin", req.begin)
.detail("End", req.end)
.detail("CanReadPopped", req.canReadPopped)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress());
}
Version checkTooOldVersion = (!req.canReadPopped || req.end == MAX_VERSION) ? req.begin : req.end;
@ -3791,7 +3795,8 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
.detail("End", req.end)
.detail("CanReadPopped", req.canReadPopped)
.detail("Version", req.begin - 1)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress());
}
loop {
@ -3808,7 +3813,8 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
.detail("End", req.end)
.detail("CanReadPopped", req.canReadPopped)
.detail("Version", blockedVersion.present() ? blockedVersion.get() : data->prevVersion)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress());
}
removeUID = true;
}
@ -3830,7 +3836,8 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
.detail("End", req.end)
.detail("CanReadPopped", req.canReadPopped)
.detail("Version", blockedVersion.present() ? blockedVersion.get() : data->prevVersion)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.detail("PeerAddress", req.reply.getEndpoint().getPrimaryAddress());
}
}
std::pair<ChangeFeedStreamReply, bool> _feedReply = wait(feedReplyFuture);
@ -5967,7 +5974,7 @@ ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
pOriginalReq->options.get().debugID.get().first(),
"storageserver.mapKeyValues.BeforeLoop");
for (; offset<sz&& * remainingLimitBytes> 0; offset += SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE) {
for (; (offset < sz) && (*remainingLimitBytes > 0); offset += SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE) {
// Divide into batches of MAX_PARALLEL_QUICK_GET_VALUE subqueries
for (int i = 0; i + offset < sz && i < SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE; i++) {
KeyValueRef* it = &input.data[i + offset];

View File

@ -1171,6 +1171,7 @@ UpdateWorkerHealthRequest doPeerHealthCheck(const WorkerInterface& interf,
TraceEvent(SevDebug, "PeerHealthMonitor")
.detail("Peer", address)
.detail("PeerAddress", address)
.detail("Force", enablePrimaryTxnSystemHealthCheck->get())
.detail("Elapsed", now() - lastLoggedTime)
.detail("Disconnected", disconnectedPeer)
@ -1201,6 +1202,7 @@ UpdateWorkerHealthRequest doPeerHealthCheck(const WorkerInterface& interf,
if (disconnectedPeer || degradedPeer) {
TraceEvent("HealthMonitorDetectDegradedPeer")
.detail("Peer", address)
.detail("PeerAddress", address)
.detail("Elapsed", now() - lastLoggedTime)
.detail("Disconnected", disconnectedPeer)
.detail("MinLatency", peer->pingLatencies.min())
@ -1229,6 +1231,7 @@ UpdateWorkerHealthRequest doPeerHealthCheck(const WorkerInterface& interf,
if (disconnectedPeer || degradedPeer) {
TraceEvent("HealthMonitorDetectDegradedPeer")
.detail("Peer", address)
.detail("PeerAddress", address)
.detail("Satellite", true)
.detail("Elapsed", now() - lastLoggedTime)
.detail("Disconnected", disconnectedPeer)
@ -1252,6 +1255,7 @@ UpdateWorkerHealthRequest doPeerHealthCheck(const WorkerInterface& interf,
TraceEvent("HealthMonitorDetectDegradedPeer")
.detail("WorkerLocation", workerLocation)
.detail("Peer", address)
.detail("PeerAddress", address)
.detail("RemoteLogRouter", true)
.detail("Elapsed", now() - lastLoggedTime)
.detail("Disconnected", true)
@ -1272,6 +1276,7 @@ UpdateWorkerHealthRequest doPeerHealthCheck(const WorkerInterface& interf,
TraceEvent("HealthMonitorDetectDegradedPeer")
.detail("WorkerLocation", workerLocation)
.detail("Peer", address)
.detail("PeerAddress", address)
.detail("ExtensiveConnectivityCheck", true)
.detail("Elapsed", now() - lastLoggedTime)
.detail("Disconnected", true)
@ -1291,7 +1296,7 @@ UpdateWorkerHealthRequest doPeerHealthCheck(const WorkerInterface& interf,
} else if (degradedPeer) {
req.degradedPeers.push_back(address);
} else if (isDegradedPeer(lastReq, address)) {
TraceEvent("HealthMonitorDetectRecoveredPeer").detail("Peer", address);
TraceEvent("HealthMonitorDetectRecoveredPeer").detail("Peer", address).detail("PeerAddress", address);
req.recoveredPeers.push_back(address);
}
}
@ -1315,7 +1320,10 @@ UpdateWorkerHealthRequest doPeerHealthCheck(const WorkerInterface& interf,
(workerLocation == Primary && addressInDbAndPrimarySatelliteDc(address, dbInfo)) ||
(checkRemoteLogRouterConnectivity && (workerLocation == Primary || workerLocation == Satellite) &&
addressIsRemoteLogRouter(address, dbInfo))) {
TraceEvent("HealthMonitorDetectRecentClosedPeer").suppressFor(30).detail("Peer", address);
TraceEvent("HealthMonitorDetectRecentClosedPeer")
.suppressFor(30)
.detail("Peer", address)
.detail("PeerAddress", address);
req.disconnectedPeers.push_back(address);
}
}

View File

@ -26,6 +26,7 @@
#include "flow/Trace.h"
#include <algorithm>
#include <memory>
#include <string_view>
#ifndef BOOST_SYSTEM_NO_LIB
#define BOOST_SYSTEM_NO_LIB
#endif
@ -342,15 +343,21 @@ class BindPromise {
Promise<Void> p;
std::variant<const char*, AuditedEvent> errContext;
UID errID;
NetworkAddress peerAddr;
public:
BindPromise(const char* errContext, UID errID) : errContext(errContext), errID(errID) {}
BindPromise(AuditedEvent auditedEvent, UID errID) : errContext(auditedEvent), errID(errID) {}
BindPromise(BindPromise const& r) : p(r.p), errContext(r.errContext), errID(r.errID) {}
BindPromise(BindPromise&& r) noexcept : p(std::move(r.p)), errContext(r.errContext), errID(r.errID) {}
BindPromise(BindPromise const& r) : p(r.p), errContext(r.errContext), errID(r.errID), peerAddr(r.peerAddr) {}
BindPromise(BindPromise&& r) noexcept
: p(std::move(r.p)), errContext(r.errContext), errID(r.errID), peerAddr(r.peerAddr) {}
Future<Void> getFuture() const { return p.getFuture(); }
NetworkAddress getPeerAddr() const { return peerAddr; }
void setPeerAddr(const NetworkAddress& addr) { peerAddr = addr; }
void operator()(const boost::system::error_code& error, size_t bytesWritten = 0) {
try {
if (error) {
@ -369,6 +376,11 @@ public:
if (error.value() >= (1 << 24L)) {
evt.detail("WhichMeans", TLSPolicy::ErrorString(error));
}
if (peerAddr.isValid()) {
evt.detail("PeerAddr", peerAddr);
evt.detail("PeerAddress", peerAddr);
}
}
p.sendError(connection_failed());
@ -533,6 +545,7 @@ private:
TraceEvent(SevWarn, "N2_CloseError", id)
.suppressFor(1.0)
.detail("PeerAddr", peer_address)
.detail("PeerAddress", peer_address)
.detail("ErrorCode", error.value())
.detail("Message", error.message());
}
@ -541,6 +554,7 @@ private:
TraceEvent(SevWarn, "N2_ReadError", id)
.suppressFor(1.0)
.detail("PeerAddr", peer_address)
.detail("PeerAddress", peer_address)
.detail("ErrorCode", error.value())
.detail("Message", error.message());
closeSocket();
@ -549,6 +563,7 @@ private:
TraceEvent(SevWarn, "N2_WriteError", id)
.suppressFor(1.0)
.detail("PeerAddr", peer_address)
.detail("PeerAddress", peer_address)
.detail("ErrorCode", error.value())
.detail("Message", error.message());
closeSocket();
@ -790,6 +805,14 @@ struct SSLHandshakerThread final : IThreadPoolReceiver {
Handshake(ssl_socket& socket, ssl_socket::handshake_type type) : socket(socket), type(type) {}
double getTimeEstimate() const override { return 0.001; }
std::string getPeerAddress() const {
std::ostringstream o;
boost::system::error_code ec;
auto addr = socket.lowest_layer().remote_endpoint(ec);
o << (!ec.failed() ? addr.address().to_string() : std::string_view("0.0.0.0"));
return std::move(o).str();
}
ThreadReturnPromise<Void> done;
ssl_socket& socket;
ssl_socket::handshake_type type;
@ -809,6 +832,8 @@ struct SSLHandshakerThread final : IThreadPoolReceiver {
TraceEvent(SevWarn,
h.type == ssl_socket::handshake_type::client ? "N2_ConnectHandshakeError"_audit
: "N2_AcceptHandshakeError"_audit)
.detail("PeerAddr", h.getPeerAddress())
.detail("PeerAddress", h.getPeerAddress())
.detail("ErrorCode", h.err.value())
.detail("ErrorMsg", h.err.message().c_str())
.detail("BackgroundThread", true);
@ -820,6 +845,8 @@ struct SSLHandshakerThread final : IThreadPoolReceiver {
TraceEvent(SevWarn,
h.type == ssl_socket::handshake_type::client ? "N2_ConnectHandshakeUnknownError"_audit
: "N2_AcceptHandshakeUnknownError"_audit)
.detail("PeerAddr", h.getPeerAddress())
.detail("PeerAddress", h.getPeerAddress())
.detail("BackgroundThread", true);
h.done.sendError(connection_failed());
}
@ -910,7 +937,8 @@ public:
N2::g_net2->sslHandshakerPool->post(handshake);
} else {
// Otherwise use flow network thread
BindPromise p("N2_AcceptHandshakeError"_audit, UID());
BindPromise p("N2_AcceptHandshakeError"_audit, self->id);
p.setPeerAddr(self->getPeerAddress());
onHandshook = p.getFuture();
self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::server, std::move(p));
}
@ -993,6 +1021,7 @@ public:
} else {
// Otherwise use flow network thread
BindPromise p("N2_ConnectHandshakeError"_audit, self->id);
p.setPeerAddr(self->getPeerAddress());
onHandshook = p.getFuture();
self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::client, std::move(p));
}
@ -1156,6 +1185,7 @@ private:
TraceEvent(SevWarn, "N2_ReadError", id)
.suppressFor(1.0)
.detail("PeerAddr", peer_address)
.detail("PeerAddress", peer_address)
.detail("ErrorCode", error.value())
.detail("Message", error.message());
closeSocket();
@ -1164,6 +1194,7 @@ private:
TraceEvent(SevWarn, "N2_WriteError", id)
.suppressFor(1.0)
.detail("PeerAddr", peer_address)
.detail("PeerAddress", peer_address)
.detail("ErrorCode", error.value())
.detail("Message", error.message());
closeSocket();