diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index 154d870f88..7473d7525a 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -26,16 +26,20 @@ #include "flow/actorcompiler.h" // has to be last include // create a peek stream for cursor when it's possible -void tryEstablishPeekStream(ILogSystem::ServerPeekCursor* self) { +ACTOR Future tryEstablishPeekStream(ILogSystem::ServerPeekCursor* self) { if (self->peekReplyStream.present()) - return; + return Void(); else if (!self->interf || !self->interf->get().present()) { self->peekReplyStream.reset(); - return; + return Never(); } + wait(IFailureMonitor::failureMonitor().onStateEqual(self->interf->get().interf().peekStreamMessages.getEndpoint(), + FailureStatus(false))); self->peekReplyStream = self->interf->get().interf().peekStreamMessages.getReplyStream(TLogPeekStreamRequest( self->messageVersion.version, self->tag, self->returnIfBlocked, std::numeric_limits::max())); - TraceEvent(SevDebug, "SPC_StreamCreated", self->randomID); + TraceEvent(SevDebug, "SPC_StreamCreated", self->randomID) + .detail("PeerAddress", self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress()); + return Void(); } ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference>> const& interf, @@ -328,21 +332,23 @@ ACTOR Future serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T loop { try { - tryEstablishPeekStream(self); state Future fPeekReply = self->peekReplyStream.present() ? map(waitAndForward(self->peekReplyStream.get().getFuture()), [](const TLogPeekStreamReply& r) { return r.rep; }) : Never(); choose { + when(wait(self->peekReplyStream.present() ? Never() : tryEstablishPeekStream(self))) {} when(wait(self->interf->onChange())) { self->onlySpilled = false; self->peekReplyStream.reset(); } - when(TLogPeekReply res = - wait(self->peekReplyStream.present() - ? recordRequestMetrics( - self, self->peekReplyStream.get().getEndpoint().getPrimaryAddress(), fPeekReply) - : Never())) { + when(TLogPeekReply res = wait( + self->peekReplyStream.present() + ? recordRequestMetrics( + self, + self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress(), + fPeekReply) + : Never())) { updateCursorWithReply(self, res); TraceEvent("SPC_GetMoreB", self->randomID) .detail("Has", self->hasMessage()) @@ -352,6 +358,7 @@ ACTOR Future serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T } } } catch (Error& e) { + TraceEvent(SevDebug, "SPC_GetMoreB_Error", self->randomID).detail("Error", e.what()); if (e.code() == error_code_connection_failed) { self->peekReplyStream.reset(); } else if (e.code() == error_code_end_of_stream) { @@ -406,15 +413,15 @@ Future ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) { return Void(); if (!more.isValid() || more.isReady()) { more = serverPeekStreamGetMore(this, taskID); -// if (usePeekStream && taskID == TaskPriority::TLogPeekReply) { -// more = serverPeekStreamGetMore(this, taskID); -// } -// if (parallelGetMore || onlySpilled || futureResults.size()) { -// more = serverPeekParallelGetMore(this, taskID); -// } -// else { -// more = serverPeekGetMore(this, taskID); -// } + // if (usePeekStream && taskID == TaskPriority::TLogPeekReply) { + // more = serverPeekStreamGetMore(this, taskID); + // } + // if (parallelGetMore || onlySpilled || futureResults.size()) { + // more = serverPeekParallelGetMore(this, taskID); + // } + // else { + // more = serverPeekGetMore(this, taskID); + // } } return more; } diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 9ad416986e..f437f24637 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -341,6 +341,7 @@ struct TLogData : NonCopyable { int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling. int64_t overheadBytesInput; int64_t overheadBytesDurable; + int activePeekStreams = 0; WorkerCache tlogCache; FlowLock peekMemoryLimiter; @@ -557,7 +558,6 @@ struct LogData : NonCopyable, public ReferenceCounted { TLogData* tLogData; Promise recoveryComplete, committingQueue; Version unrecoveredBefore, recoveredAt; - int activePeekStreams = 0; struct PeekTrackerData { std::map>> @@ -669,7 +669,7 @@ struct LogData : NonCopyable, public ReferenceCounted { specialCounter(cc, "PeekMemoryReserved", [tLogData]() { return tLogData->peekMemoryLimiter.activePermits(); }); specialCounter(cc, "PeekMemoryRequestsStalled", [tLogData]() { return tLogData->peekMemoryLimiter.waiters(); }); specialCounter(cc, "Generation", [this]() { return this->recoveryCount; }); - specialCounter(cc, "ActivePeekStreams", [this]() { return this->activePeekStreams; }); + specialCounter(cc, "ActivePeekStreams", [tLogData]() { return tLogData->activePeekStreams; }); } ~LogData() { @@ -1919,7 +1919,8 @@ ACTOR Future peekTLog(TLogData* self, // This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference logData) { - logData->activePeekStreams ++; + self->activePeekStreams++; + TraceEvent(SevDebug, "TLogPeekStream", logData->logId); state Version begin = req.begin; state bool onlySpilled = false; if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) { @@ -1936,7 +1937,9 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref onlySpilled = reply.rep.onlySpilled; wait(delay(0, g_network->getCurrentTask())); } catch (Error& e) { - logData->activePeekStreams --; + self->activePeekStreams--; + TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).detail("Error", e.what()); + if (e.code() == error_code_end_of_stream) { req.reply.sendError(e); return Void();