more debug logs; let tryEstablishStream wait until the connection is good

This commit is contained in:
Xiaoxi Wang 2021-07-19 18:43:51 +00:00
parent 36dca1f927
commit f3667ce91a
2 changed files with 33 additions and 23 deletions

View File

@ -26,16 +26,20 @@
#include "flow/actorcompiler.h" // has to be last include #include "flow/actorcompiler.h" // has to be last include
// create a peek stream for cursor when it's possible // create a peek stream for cursor when it's possible
void tryEstablishPeekStream(ILogSystem::ServerPeekCursor* self) { ACTOR Future<Void> tryEstablishPeekStream(ILogSystem::ServerPeekCursor* self) {
if (self->peekReplyStream.present()) if (self->peekReplyStream.present())
return; return Void();
else if (!self->interf || !self->interf->get().present()) { else if (!self->interf || !self->interf->get().present()) {
self->peekReplyStream.reset(); self->peekReplyStream.reset();
return; return Never();
} }
wait(IFailureMonitor::failureMonitor().onStateEqual(self->interf->get().interf().peekStreamMessages.getEndpoint(),
FailureStatus(false)));
self->peekReplyStream = self->interf->get().interf().peekStreamMessages.getReplyStream(TLogPeekStreamRequest( self->peekReplyStream = self->interf->get().interf().peekStreamMessages.getReplyStream(TLogPeekStreamRequest(
self->messageVersion.version, self->tag, self->returnIfBlocked, std::numeric_limits<int>::max())); self->messageVersion.version, self->tag, self->returnIfBlocked, std::numeric_limits<int>::max()));
TraceEvent(SevDebug, "SPC_StreamCreated", self->randomID); TraceEvent(SevDebug, "SPC_StreamCreated", self->randomID)
.detail("PeerAddress", self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress());
return Void();
} }
ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf,
@ -328,20 +332,22 @@ ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T
loop { loop {
try { try {
tryEstablishPeekStream(self);
state Future<TLogPeekReply> fPeekReply = self->peekReplyStream.present() state Future<TLogPeekReply> fPeekReply = self->peekReplyStream.present()
? map(waitAndForward(self->peekReplyStream.get().getFuture()), ? map(waitAndForward(self->peekReplyStream.get().getFuture()),
[](const TLogPeekStreamReply& r) { return r.rep; }) [](const TLogPeekStreamReply& r) { return r.rep; })
: Never(); : Never();
choose { choose {
when(wait(self->peekReplyStream.present() ? Never() : tryEstablishPeekStream(self))) {}
when(wait(self->interf->onChange())) { when(wait(self->interf->onChange())) {
self->onlySpilled = false; self->onlySpilled = false;
self->peekReplyStream.reset(); self->peekReplyStream.reset();
} }
when(TLogPeekReply res = when(TLogPeekReply res = wait(
wait(self->peekReplyStream.present() self->peekReplyStream.present()
? recordRequestMetrics( ? recordRequestMetrics(
self, self->peekReplyStream.get().getEndpoint().getPrimaryAddress(), fPeekReply) self,
self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress(),
fPeekReply)
: Never())) { : Never())) {
updateCursorWithReply(self, res); updateCursorWithReply(self, res);
TraceEvent("SPC_GetMoreB", self->randomID) TraceEvent("SPC_GetMoreB", self->randomID)
@ -352,6 +358,7 @@ ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T
} }
} }
} catch (Error& e) { } catch (Error& e) {
TraceEvent(SevDebug, "SPC_GetMoreB_Error", self->randomID).detail("Error", e.what());
if (e.code() == error_code_connection_failed) { if (e.code() == error_code_connection_failed) {
self->peekReplyStream.reset(); self->peekReplyStream.reset();
} else if (e.code() == error_code_end_of_stream) { } else if (e.code() == error_code_end_of_stream) {

View File

@ -341,6 +341,7 @@ struct TLogData : NonCopyable {
int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling. int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling.
int64_t overheadBytesInput; int64_t overheadBytesInput;
int64_t overheadBytesDurable; int64_t overheadBytesDurable;
int activePeekStreams = 0;
WorkerCache<TLogInterface> tlogCache; WorkerCache<TLogInterface> tlogCache;
FlowLock peekMemoryLimiter; FlowLock peekMemoryLimiter;
@ -557,7 +558,6 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
TLogData* tLogData; TLogData* tLogData;
Promise<Void> recoveryComplete, committingQueue; Promise<Void> recoveryComplete, committingQueue;
Version unrecoveredBefore, recoveredAt; Version unrecoveredBefore, recoveredAt;
int activePeekStreams = 0;
struct PeekTrackerData { struct PeekTrackerData {
std::map<int, Promise<std::pair<Version, bool>>> std::map<int, Promise<std::pair<Version, bool>>>
@ -669,7 +669,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
specialCounter(cc, "PeekMemoryReserved", [tLogData]() { return tLogData->peekMemoryLimiter.activePermits(); }); specialCounter(cc, "PeekMemoryReserved", [tLogData]() { return tLogData->peekMemoryLimiter.activePermits(); });
specialCounter(cc, "PeekMemoryRequestsStalled", [tLogData]() { return tLogData->peekMemoryLimiter.waiters(); }); specialCounter(cc, "PeekMemoryRequestsStalled", [tLogData]() { return tLogData->peekMemoryLimiter.waiters(); });
specialCounter(cc, "Generation", [this]() { return this->recoveryCount; }); specialCounter(cc, "Generation", [this]() { return this->recoveryCount; });
specialCounter(cc, "ActivePeekStreams", [this]() { return this->activePeekStreams; }); specialCounter(cc, "ActivePeekStreams", [tLogData]() { return tLogData->activePeekStreams; });
} }
~LogData() { ~LogData() {
@ -1919,7 +1919,8 @@ ACTOR Future<TLogPeekReply> peekTLog(TLogData* self,
// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover // This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover
ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) { ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) {
logData->activePeekStreams ++; self->activePeekStreams++;
TraceEvent(SevDebug, "TLogPeekStream", logData->logId);
state Version begin = req.begin; state Version begin = req.begin;
state bool onlySpilled = false; state bool onlySpilled = false;
if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) { if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) {
@ -1936,7 +1937,9 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
onlySpilled = reply.rep.onlySpilled; onlySpilled = reply.rep.onlySpilled;
wait(delay(0, g_network->getCurrentTask())); wait(delay(0, g_network->getCurrentTask()));
} catch (Error& e) { } catch (Error& e) {
logData->activePeekStreams --; self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).detail("Error", e.what());
if (e.code() == error_code_end_of_stream) { if (e.code() == error_code_end_of_stream) {
req.reply.sendError(e); req.reply.sendError(e);
return Void(); return Void();