Add more debug events

This commit is contained in:
Jingyu Zhou 2022-11-17 11:36:36 -08:00
parent c6ebdd8ae8
commit f285a91f6c
3 changed files with 72 additions and 24 deletions

View File

@ -28,6 +28,7 @@
#include "flow/ActorCollection.h"
#include "flow/Arena.h"
#include "flow/Histogram.h"
#include "flow/Trace.h"
#include "flow/network.h"
#include "flow/DebugTrace.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -448,6 +449,14 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
state int sequence = -1;
state UID peekId;
DebugLogTraceEvent("LogRouterPeek0", self->dbgid)
.detail("ReturnIfBlocked", reqReturnIfBlocked)
.detail("Tag", reqTag.toString())
.detail("Seq", reqSequence.present() ? reqSequence.get().second : -1)
.detail("SeqCursor", reqSequence.present() ? reqSequence.get().first : UID())
.detail("Ver", self->version.get())
.detail("Begin", reqBegin);
if (reqSequence.present()) {
try {
peekId = reqSequence.get().first;
@ -481,6 +490,13 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
reqOnlySpilled = prevPeekData.second;
wait(yield());
} catch (Error& e) {
DebugLogTraceEvent("LogRouterPeekError", self->dbgid)
.error(e)
.detail("Tag", reqTag.toString())
.detail("Seq", reqSequence.present() ? reqSequence.get().second : -1)
.detail("SeqCursor", reqSequence.present() ? reqSequence.get().first : UID())
.detail("Begin", reqBegin);
if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
replyPromise.sendError(e);
return Void();
@ -490,12 +506,6 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
}
}
DebugLogTraceEvent("LogRouterPeek0", self->dbgid)
.detail("ReturnIfBlocked", reqReturnIfBlocked)
.detail("Tag", reqTag.toString())
.detail("Ver", self->version.get())
.detail("Begin", reqBegin);
if (reqReturnIfBlocked && self->version.get() < reqBegin) {
replyPromise.sendError(end_of_stream());
if (reqSequence.present()) {
@ -528,24 +538,22 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
TraceEvent(SevWarnAlways, "LogRouterPeekPopped", self->dbgid)
.detail("Begin", reqBegin)
.detail("Popped", poppedVer)
.detail("Tag", reqTag.toString())
.detail("Seq", reqSequence.present() ? reqSequence.get().second : -1)
.detail("SeqCursor", reqSequence.present() ? reqSequence.get().first : UID())
.detail("Start", self->startVersion);
if (std::is_same<PromiseType, Promise<TLogPeekReply>>::value) {
// kills logRouterPeekStream actor, otherwise that actor becomes stuck
throw operation_obsolete();
}
if (std::is_same<PromiseType, ReplyPromise<TLogPeekReply>>::value) {
// Send error to avoid a race condition that the peer is really retrying,
// otherwise, the peer could be blocked forever.
replyPromise.sendError(operation_obsolete());
} else {
replyPromise.send(Never());
}
/*if (reqSequence.present()) {
auto& trackerData = self->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled));
}
}*/
return Void();
}
@ -686,6 +694,7 @@ ACTOR Future<Void> logRouterPop(LogRouterData* self, TLogPopRequest req) {
if (!tagData) {
tagData = self->createTagData(req.tag, req.to, req.durableKnownCommittedVersion);
} else if (req.to > tagData->popped) {
DebugLogTraceEvent("LogRouterPop", self->dbgid).detail("Tag", req.tag.toString()).detail("PopVersion", req.to);
tagData->popped = req.to;
tagData->durableKnownCommittedVersion = req.durableKnownCommittedVersion;
wait(tagData->eraseMessagesBefore(req.to, self, TaskPriority::TLogPop));

View File

@ -62,6 +62,8 @@ ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference<AsyncVar<OptionalInterf
this->results.minKnownCommittedVersion = 0;
DebugLogTraceEvent(SevDebug, "SPC_Starting", randomID)
.detail("Tag", tag.toString())
.detail("Parallel", parallelGetMore)
.detail("Interf", interf && interf->get().present() ? interf->get().id() : UID())
.detail("UsePeekStream", usePeekStream)
.detail("Begin", begin)
.detail("End", end);
@ -111,7 +113,9 @@ bool ILogSystem::ServerPeekCursor::hasMessage() const {
}
void ILogSystem::ServerPeekCursor::nextMessage() {
//TraceEvent("SPC_NextMessage", randomID).detail("MessageVersion", messageVersion.toString());
DebugLogTraceEvent("SPC_NextMessage", randomID)
.detail("Tag", tag.toString())
.detail("MessageVersion", messageVersion.toString());
ASSERT(hasMsg);
if (rd.empty()) {
messageVersion.reset(std::min(results.end, end.version));
@ -143,11 +147,13 @@ void ILogSystem::ServerPeekCursor::nextMessage() {
rd.rewind();
rd.readBytes(messageAndTags.getHeaderSize());
hasMsg = true;
//TraceEvent("SPC_NextMessageB", randomID).detail("MessageVersion", messageVersion.toString());
DebugLogTraceEvent("SPC_NextMessageB", randomID)
.detail("Tag", tag.toString())
.detail("MessageVersion", messageVersion.toString());
}
StringRef ILogSystem::ServerPeekCursor::getMessage() {
//TraceEvent("SPC_GetMessage", randomID);
DebugLogTraceEvent("SPC_GetMessage", randomID).detail("Tag", tag.toString());
StringRef message = messageAndTags.getMessageWithoutTags();
rd.readBytes(message.size()); // Consumes the message.
return message;
@ -260,6 +266,14 @@ ACTOR Future<Void> serverPeekParallelGetMore(ILogSystem::ServerPeekCursor* self,
}
loop {
DebugLogTraceEvent("SPC_GetMoreP", self->randomID)
.detail("Tag", self->tag.toString())
.detail("Has", self->hasMessage())
.detail("Begin", self->messageVersion.version)
.detail("Parallel", self->parallelGetMore)
.detail("Seq", self->sequence)
.detail("Sizes", self->futureResults.size())
.detail("Interf", self->interf->get().present() ? self->interf->get().id() : UID());
state Version expectedBegin = self->messageVersion.version;
try {
if (self->parallelGetMore || self->onlySpilled) {
@ -294,7 +308,12 @@ ACTOR Future<Void> serverPeekParallelGetMore(ILogSystem::ServerPeekCursor* self,
expectedBegin = res.end;
self->futureResults.pop_front();
updateCursorWithReply(self, res);
//TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0);
DebugLogTraceEvent("SPC_GetMoreReply", self->randomID)
.detail("Has", self->hasMessage())
.detail("Tag", self->tag.toString())
.detail("End", res.end)
.detail("Size", self->futureResults.size())
.detail("Popped", res.popped.present() ? res.popped.get() : 0);
return Void();
}
when(wait(self->interfaceChanged)) {
@ -306,11 +325,17 @@ ACTOR Future<Void> serverPeekParallelGetMore(ILogSystem::ServerPeekCursor* self,
}
}
} catch (Error& e) {
DebugLogTraceEvent("PeekCursorError", self->randomID)
.error(e)
.detail("Tag", self->tag.toString())
.detail("Begin", self->messageVersion.version)
.detail("Interf", self->interf->get().present() ? self->interf->get().id() : UID());
if (e.code() == error_code_end_of_stream) {
self->end.reset(self->messageVersion.version);
return Void();
} else if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
TraceEvent("PeekCursorTimedOut", self->randomID).error(e);
TraceEvent ev("PeekCursorTimedOut", self->randomID);
// We *should* never get timed_out(), as it means the TLog got stuck while handling a parallel peek,
// and thus we've likely just wasted 10min.
// timed_out() is sent by cleanupPeekTrackers as value PEEK_TRACKER_EXPIRATION_TIME
@ -326,6 +351,11 @@ ACTOR Future<Void> serverPeekParallelGetMore(ILogSystem::ServerPeekCursor* self,
self->randomID = deterministicRandom()->randomUniqueID();
self->sequence = 0;
self->futureResults.clear();
ev.error(e)
.detail("Tag", self->tag.toString())
.detail("Begin", self->messageVersion.version)
.detail("NewID", self->randomID)
.detail("Interf", self->interf->get().present() ? self->interf->get().id() : UID());
} else {
throw e;
}
@ -415,7 +445,11 @@ ACTOR Future<Void> serverPeekGetMore(ILogSystem::ServerPeekCursor* self, TaskPri
taskID))
: Never())) {
updateCursorWithReply(self, res);
//TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0);
DebugLogTraceEvent("SPC_GetMoreB", self->randomID)
.detail("Tag", self->tag.toString())
.detail("Has", self->hasMessage())
.detail("End", res.end)
.detail("Popped", res.popped.present() ? res.popped.get() : 0);
return Void();
}
when(wait(self->interf->onChange())) { self->onlySpilled = false; }
@ -431,11 +465,13 @@ ACTOR Future<Void> serverPeekGetMore(ILogSystem::ServerPeekCursor* self, TaskPri
}
Future<Void> ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) {
// TraceEvent("SPC_GetMore", randomID)
// .detail("HasMessage", hasMessage())
// .detail("More", !more.isValid() || more.isReady())
// .detail("MessageVersion", messageVersion.toString())
// .detail("End", end.toString());
DebugLogTraceEvent("SPC_GetMore", randomID)
.detail("Tag", tag.toString())
.detail("HasMessage", hasMessage())
.detail("More", !more.isValid() || more.isReady())
.detail("Parallel", parallelGetMore)
.detail("MessageVersion", messageVersion.toString())
.detail("End", end.toString());
if (hasMessage() && !parallelGetMore)
return Void();
if (!more.isValid() || more.isReady()) {

View File

@ -2187,6 +2187,9 @@ ACTOR Future<Void> doQueueCommit(TLogData* self,
if (logData->logSystem->get() &&
(!logData->isPrimary || logData->logRouterPoppedVersion < logData->logRouterPopToVersion)) {
logData->logRouterPoppedVersion = ver;
DebugLogTraceEvent("LogPop", self->dbgid)
.detail("Tag", logData->remoteTag.toString())
.detail("Version", knownCommittedVersion);
logData->logSystem->get()->pop(ver, logData->remoteTag, knownCommittedVersion, logData->locality);
}