diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index 2dc8194d3a..5ab13ee1a8 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -357,6 +357,7 @@ ACTOR Future logRouterPeekMessages( LogRouterData* self, TLogPeekRequest r reply.messages = messages.toValue(); reply.popped = self->minPopped.get() >= self->startVersion ? self->minPopped.get() : 0; reply.end = endVersion; + reply.onlySpilled = false; req.reply.send( reply ); //TraceEvent("LogRouterPeek4", self->dbgid); diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index fb9c5506fa..1b998ff769 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -379,6 +379,7 @@ struct ILogSystem { UID randomID; bool returnIfBlocked; + bool onlySpilled; bool parallelGetMore; int sequence; Deque> futureResults; diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index 0ce171274e..d73789c3fa 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -25,14 +25,14 @@ #include "flow/actorcompiler.h" // has to be last include ILogSystem::ServerPeekCursor::ServerPeekCursor( Reference>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore ) - : interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(g_random->randomUniqueID()), poppedVersion(0), returnIfBlocked(returnIfBlocked), sequence(0), parallelGetMore(parallelGetMore) { + : interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(g_random->randomUniqueID()), poppedVersion(0), returnIfBlocked(returnIfBlocked), sequence(0), onlySpilled(false), parallelGetMore(parallelGetMore) { this->results.maxKnownVersion = 0; this->results.minKnownCommittedVersion = 0; //TraceEvent("SPC_Starting", randomID).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).backtrace(); } ILogSystem::ServerPeekCursor::ServerPeekCursor( TLogPeekReply const& results, LogMessageVersion const& messageVersion, LogMessageVersion const& end, int32_t messageLength, int32_t rawLength, bool hasMsg, Version poppedVersion, Tag tag ) - : results(results), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(messageVersion), end(end), messageLength(messageLength), rawLength(rawLength), hasMsg(hasMsg), randomID(g_random->randomUniqueID()), poppedVersion(poppedVersion), returnIfBlocked(false), sequence(0), parallelGetMore(false) + : results(results), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(messageVersion), end(end), messageLength(messageLength), rawLength(rawLength), hasMsg(hasMsg), randomID(g_random->randomUniqueID()), poppedVersion(poppedVersion), returnIfBlocked(false), sequence(0), onlySpilled(false), parallelGetMore(false) { //TraceEvent("SPC_Clone", randomID); this->results.maxKnownVersion = 0; @@ -147,7 +147,7 @@ ACTOR Future serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self state Version expectedBegin = self->messageVersion.version; try { while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) { - self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, std::make_pair(self->randomID, self->sequence++)), taskID) ) ); + self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled, std::make_pair(self->randomID, self->sequence++)), taskID) ) ); } choose { @@ -158,6 +158,7 @@ ACTOR Future serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self expectedBegin = res.end; self->futureResults.pop_front(); self->results = res; + self->onlySpilled = res.onlySpilled; if(res.popped.present()) self->poppedVersion = std::min( std::max(self->poppedVersion, res.popped.get()), self->end.version ); self->rd = ArenaReader( self->results.arena, self->results.messages, Unversioned() ); @@ -172,6 +173,7 @@ ACTOR Future serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self self->interfaceChanged = self->interf->onChange(); self->randomID = g_random->randomUniqueID(); self->sequence = 0; + self->onlySpilled = false; self->futureResults.clear(); } } @@ -201,8 +203,9 @@ ACTOR Future serverPeekGetMore( ILogSystem::ServerPeekCursor* self, int ta loop { choose { when( TLogPeekReply res = wait( self->interf->get().present() ? - brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked), taskID) ) : Never() ) ) { + brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled), taskID) ) : Never() ) ) { self->results = res; + self->onlySpilled = res.onlySpilled; if(res.popped.present()) self->poppedVersion = std::min( std::max(self->poppedVersion, res.popped.get()), self->end.version ); self->rd = ArenaReader( self->results.arena, self->results.messages, Unversioned() ); @@ -213,7 +216,9 @@ ACTOR Future serverPeekGetMore( ILogSystem::ServerPeekCursor* self, int ta //TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0); return Void(); } - when( wait( self->interf->onChange() ) ) {} + when( wait( self->interf->onChange() ) ) { + self->onlySpilled = false; + } } } } catch( Error &e ) { diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index 9f86bd7574..aa7b43b169 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -1018,6 +1018,7 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere rep.minKnownCommittedVersion = logData->minKnownCommittedVersion; rep.popped = poppedVer; rep.end = poppedVer; + rep.onlySpilled = false; if(req.sequence.present()) { auto& trackerData = self->peekTracker[peekId]; @@ -1044,6 +1045,7 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere } state Version endVersion = logData->version.get() + 1; + state bool onlySpilled = false; //grab messages from disk //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); @@ -1053,7 +1055,11 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere // SOMEDAY: Only do this if an initial attempt to read from disk results in insufficient data and the required data is no longer in memory // SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the result? - peekMessagesFromMemory( logData, req, messages2, endVersion ); + if (req.onlySpilled) { + endVersion = logData->persistentDataDurableVersion + 1; + } else { + peekMessagesFromMemory( logData, req, messages2, endVersion ); + } Standalone> kvs = wait( self->persistentData->readRange(KeyRangeRef( @@ -1068,10 +1074,12 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere messages.serializeBytes(kv.value); } - if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) + if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) { endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1; - else + onlySpilled = true; + } else { messages.serializeBytes( messages2.toValue() ); + } } else { peekMessagesFromMemory( logData, req, messages, endVersion ); //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); @@ -1082,6 +1090,7 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere reply.minKnownCommittedVersion = logData->minKnownCommittedVersion; reply.messages = messages.toValue(); reply.end = endVersion; + reply.onlySpilled = onlySpilled; //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address); diff --git a/fdbserver/TLogInterface.h b/fdbserver/TLogInterface.h index 585caa23a3..f9c3db645b 100644 --- a/fdbserver/TLogInterface.h +++ b/fdbserver/TLogInterface.h @@ -150,10 +150,11 @@ struct TLogPeekReply { Version maxKnownVersion; Version minKnownCommittedVersion; Optional begin; + bool onlySpilled; template void serialize(Ar& ar) { - serializer(ar, arena, messages, end, popped, maxKnownVersion, minKnownCommittedVersion, begin); + serializer(ar, arena, messages, end, popped, maxKnownVersion, minKnownCommittedVersion, begin, onlySpilled); } }; @@ -163,15 +164,16 @@ struct TLogPeekRequest { Version begin; Tag tag; bool returnIfBlocked; + bool onlySpilled; Optional> sequence; ReplyPromise reply; - TLogPeekRequest( Version begin, Tag tag, bool returnIfBlocked, Optional> sequence = Optional>() ) : begin(begin), tag(tag), returnIfBlocked(returnIfBlocked), sequence(sequence) {} + TLogPeekRequest( Version begin, Tag tag, bool returnIfBlocked, bool onlySpilled, Optional> sequence = Optional>() ) : begin(begin), tag(tag), returnIfBlocked(returnIfBlocked), sequence(sequence), onlySpilled(onlySpilled) {} TLogPeekRequest() {} template void serialize(Ar& ar) { - serializer(ar, arena, begin, tag, returnIfBlocked, sequence, reply); + serializer(ar, arena, begin, tag, returnIfBlocked, onlySpilled, sequence, reply); } }; diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 55051ff6e4..3f602a028b 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1304,6 +1304,7 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere rep.minKnownCommittedVersion = logData->minKnownCommittedVersion; rep.popped = poppedVer; rep.end = poppedVer; + rep.onlySpilled = false; if(req.sequence.present()) { auto& trackerData = self->peekTracker[peekId]; @@ -1330,6 +1331,7 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere } state Version endVersion = logData->version.get() + 1; + state bool onlySpilled = false; //grab messages from disk //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); @@ -1339,7 +1341,11 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere // SOMEDAY: Only do this if an initial attempt to read from disk results in insufficient data and the required data is no longer in memory // SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the result? - peekMessagesFromMemory( logData, req, messages2, endVersion ); + if (req.onlySpilled) { + endVersion = logData->persistentDataDurableVersion + 1; + } else { + peekMessagesFromMemory( logData, req, messages2, endVersion ); + } if (req.tag == txsTag) { Standalone> kvs = wait( @@ -1353,10 +1359,12 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere messages.serializeBytes(kv.value); } - if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) + if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) { endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1; - else + onlySpilled = true; + } else { messages.serializeBytes( messages2.toValue() ); + } } else { // FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow. Standalone> kvrefs = wait( @@ -1433,13 +1441,20 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere messageReads.clear(); memoryReservation.release(); - if (earlyEnd) + if (earlyEnd) { endVersion = lastRefMessageVersion + 1; - else + onlySpilled = true; + } else { messages.serializeBytes( messages2.toValue() ); + } } } else { - peekMessagesFromMemory( logData, req, messages, endVersion ); + if (req.onlySpilled) { + endVersion = logData->persistentDataDurableVersion + 1; + } else { + peekMessagesFromMemory( logData, req, messages, endVersion ); + } + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); } @@ -1448,6 +1463,7 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere reply.minKnownCommittedVersion = logData->minKnownCommittedVersion; reply.messages = messages.toValue(); reply.end = endVersion; + reply.onlySpilled = onlySpilled; //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());