Save TLog resources by letting peek request only spilled data.

If a peek is entirely fulfilled from spilled data, then it's likely that
the next peek will be also.  It is thus wasteful for each of these peeks
to call peekMessagesFromMemory, which memcpy's excessively, and then
throw all that data away without using it.

Now, TLogs will give a hint back to peek cursors about if the provided
reply was served entirely from the spilled data, which peek curors then
feed back as the hint into their next request.

At some point, a cursor will send a request for only spilled data, get
an incomplete response, and then be told to send its next request as one
that peeks from memory as well, and then it will fully catch up.
This commit is contained in:
Alex Miller 2019-05-14 14:07:49 -10:00
parent a9b03ec505
commit 4eb4c03ce5
6 changed files with 51 additions and 17 deletions

View File

@ -357,6 +357,7 @@ ACTOR Future<Void> logRouterPeekMessages( LogRouterData* self, TLogPeekRequest r
reply.messages = messages.toValue(); reply.messages = messages.toValue();
reply.popped = self->minPopped.get() >= self->startVersion ? self->minPopped.get() : 0; reply.popped = self->minPopped.get() >= self->startVersion ? self->minPopped.get() : 0;
reply.end = endVersion; reply.end = endVersion;
reply.onlySpilled = false;
req.reply.send( reply ); req.reply.send( reply );
//TraceEvent("LogRouterPeek4", self->dbgid); //TraceEvent("LogRouterPeek4", self->dbgid);

View File

@ -379,6 +379,7 @@ struct ILogSystem {
UID randomID; UID randomID;
bool returnIfBlocked; bool returnIfBlocked;
bool onlySpilled;
bool parallelGetMore; bool parallelGetMore;
int sequence; int sequence;
Deque<Future<TLogPeekReply>> futureResults; Deque<Future<TLogPeekReply>> futureResults;

View File

@ -25,14 +25,14 @@
#include "flow/actorcompiler.h" // has to be last include #include "flow/actorcompiler.h" // has to be last include
ILogSystem::ServerPeekCursor::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore ) ILogSystem::ServerPeekCursor::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore )
: interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(g_random->randomUniqueID()), poppedVersion(0), returnIfBlocked(returnIfBlocked), sequence(0), parallelGetMore(parallelGetMore) { : interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(g_random->randomUniqueID()), poppedVersion(0), returnIfBlocked(returnIfBlocked), sequence(0), onlySpilled(false), parallelGetMore(parallelGetMore) {
this->results.maxKnownVersion = 0; this->results.maxKnownVersion = 0;
this->results.minKnownCommittedVersion = 0; this->results.minKnownCommittedVersion = 0;
//TraceEvent("SPC_Starting", randomID).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).backtrace(); //TraceEvent("SPC_Starting", randomID).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).backtrace();
} }
ILogSystem::ServerPeekCursor::ServerPeekCursor( TLogPeekReply const& results, LogMessageVersion const& messageVersion, LogMessageVersion const& end, int32_t messageLength, int32_t rawLength, bool hasMsg, Version poppedVersion, Tag tag ) ILogSystem::ServerPeekCursor::ServerPeekCursor( TLogPeekReply const& results, LogMessageVersion const& messageVersion, LogMessageVersion const& end, int32_t messageLength, int32_t rawLength, bool hasMsg, Version poppedVersion, Tag tag )
: results(results), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(messageVersion), end(end), messageLength(messageLength), rawLength(rawLength), hasMsg(hasMsg), randomID(g_random->randomUniqueID()), poppedVersion(poppedVersion), returnIfBlocked(false), sequence(0), parallelGetMore(false) : results(results), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(messageVersion), end(end), messageLength(messageLength), rawLength(rawLength), hasMsg(hasMsg), randomID(g_random->randomUniqueID()), poppedVersion(poppedVersion), returnIfBlocked(false), sequence(0), onlySpilled(false), parallelGetMore(false)
{ {
//TraceEvent("SPC_Clone", randomID); //TraceEvent("SPC_Clone", randomID);
this->results.maxKnownVersion = 0; this->results.maxKnownVersion = 0;
@ -147,7 +147,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
state Version expectedBegin = self->messageVersion.version; state Version expectedBegin = self->messageVersion.version;
try { try {
while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) { while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) {
self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, std::make_pair(self->randomID, self->sequence++)), taskID) ) ); self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled, std::make_pair(self->randomID, self->sequence++)), taskID) ) );
} }
choose { choose {
@ -158,6 +158,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
expectedBegin = res.end; expectedBegin = res.end;
self->futureResults.pop_front(); self->futureResults.pop_front();
self->results = res; self->results = res;
self->onlySpilled = res.onlySpilled;
if(res.popped.present()) if(res.popped.present())
self->poppedVersion = std::min( std::max(self->poppedVersion, res.popped.get()), self->end.version ); self->poppedVersion = std::min( std::max(self->poppedVersion, res.popped.get()), self->end.version );
self->rd = ArenaReader( self->results.arena, self->results.messages, Unversioned() ); self->rd = ArenaReader( self->results.arena, self->results.messages, Unversioned() );
@ -172,6 +173,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
self->interfaceChanged = self->interf->onChange(); self->interfaceChanged = self->interf->onChange();
self->randomID = g_random->randomUniqueID(); self->randomID = g_random->randomUniqueID();
self->sequence = 0; self->sequence = 0;
self->onlySpilled = false;
self->futureResults.clear(); self->futureResults.clear();
} }
} }
@ -201,8 +203,9 @@ ACTOR Future<Void> serverPeekGetMore( ILogSystem::ServerPeekCursor* self, int ta
loop { loop {
choose { choose {
when( TLogPeekReply res = wait( self->interf->get().present() ? when( TLogPeekReply res = wait( self->interf->get().present() ?
brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked), taskID) ) : Never() ) ) { brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled), taskID) ) : Never() ) ) {
self->results = res; self->results = res;
self->onlySpilled = res.onlySpilled;
if(res.popped.present()) if(res.popped.present())
self->poppedVersion = std::min( std::max(self->poppedVersion, res.popped.get()), self->end.version ); self->poppedVersion = std::min( std::max(self->poppedVersion, res.popped.get()), self->end.version );
self->rd = ArenaReader( self->results.arena, self->results.messages, Unversioned() ); self->rd = ArenaReader( self->results.arena, self->results.messages, Unversioned() );
@ -213,7 +216,9 @@ ACTOR Future<Void> serverPeekGetMore( ILogSystem::ServerPeekCursor* self, int ta
//TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0); //TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0);
return Void(); return Void();
} }
when( wait( self->interf->onChange() ) ) {} when( wait( self->interf->onChange() ) ) {
self->onlySpilled = false;
}
} }
} }
} catch( Error &e ) { } catch( Error &e ) {

View File

@ -1018,6 +1018,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion; rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
rep.popped = poppedVer; rep.popped = poppedVer;
rep.end = poppedVer; rep.end = poppedVer;
rep.onlySpilled = false;
if(req.sequence.present()) { if(req.sequence.present()) {
auto& trackerData = self->peekTracker[peekId]; auto& trackerData = self->peekTracker[peekId];
@ -1044,6 +1045,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
} }
state Version endVersion = logData->version.get() + 1; state Version endVersion = logData->version.get() + 1;
state bool onlySpilled = false;
//grab messages from disk //grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
@ -1053,7 +1055,11 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
// SOMEDAY: Only do this if an initial attempt to read from disk results in insufficient data and the required data is no longer in memory // SOMEDAY: Only do this if an initial attempt to read from disk results in insufficient data and the required data is no longer in memory
// SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the result? // SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the result?
peekMessagesFromMemory( logData, req, messages2, endVersion ); if (req.onlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory( logData, req, messages2, endVersion );
}
Standalone<VectorRef<KeyValueRef>> kvs = wait( Standalone<VectorRef<KeyValueRef>> kvs = wait(
self->persistentData->readRange(KeyRangeRef( self->persistentData->readRange(KeyRangeRef(
@ -1068,10 +1074,12 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
messages.serializeBytes(kv.value); messages.serializeBytes(kv.value);
} }
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1; endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
else onlySpilled = true;
} else {
messages.serializeBytes( messages2.toValue() ); messages.serializeBytes( messages2.toValue() );
}
} else { } else {
peekMessagesFromMemory( logData, req, messages, endVersion ); peekMessagesFromMemory( logData, req, messages, endVersion );
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
@ -1082,6 +1090,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion; reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
reply.messages = messages.toValue(); reply.messages = messages.toValue();
reply.end = endVersion; reply.end = endVersion;
reply.onlySpilled = onlySpilled;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address); //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address);

View File

@ -150,10 +150,11 @@ struct TLogPeekReply {
Version maxKnownVersion; Version maxKnownVersion;
Version minKnownCommittedVersion; Version minKnownCommittedVersion;
Optional<Version> begin; Optional<Version> begin;
bool onlySpilled;
template <class Ar> template <class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
serializer(ar, arena, messages, end, popped, maxKnownVersion, minKnownCommittedVersion, begin); serializer(ar, arena, messages, end, popped, maxKnownVersion, minKnownCommittedVersion, begin, onlySpilled);
} }
}; };
@ -163,15 +164,16 @@ struct TLogPeekRequest {
Version begin; Version begin;
Tag tag; Tag tag;
bool returnIfBlocked; bool returnIfBlocked;
bool onlySpilled;
Optional<std::pair<UID, int>> sequence; Optional<std::pair<UID, int>> sequence;
ReplyPromise<TLogPeekReply> reply; ReplyPromise<TLogPeekReply> reply;
TLogPeekRequest( Version begin, Tag tag, bool returnIfBlocked, Optional<std::pair<UID, int>> sequence = Optional<std::pair<UID, int>>() ) : begin(begin), tag(tag), returnIfBlocked(returnIfBlocked), sequence(sequence) {} TLogPeekRequest( Version begin, Tag tag, bool returnIfBlocked, bool onlySpilled, Optional<std::pair<UID, int>> sequence = Optional<std::pair<UID, int>>() ) : begin(begin), tag(tag), returnIfBlocked(returnIfBlocked), sequence(sequence), onlySpilled(onlySpilled) {}
TLogPeekRequest() {} TLogPeekRequest() {}
template <class Ar> template <class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
serializer(ar, arena, begin, tag, returnIfBlocked, sequence, reply); serializer(ar, arena, begin, tag, returnIfBlocked, onlySpilled, sequence, reply);
} }
}; };

View File

@ -1304,6 +1304,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion; rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
rep.popped = poppedVer; rep.popped = poppedVer;
rep.end = poppedVer; rep.end = poppedVer;
rep.onlySpilled = false;
if(req.sequence.present()) { if(req.sequence.present()) {
auto& trackerData = self->peekTracker[peekId]; auto& trackerData = self->peekTracker[peekId];
@ -1330,6 +1331,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
} }
state Version endVersion = logData->version.get() + 1; state Version endVersion = logData->version.get() + 1;
state bool onlySpilled = false;
//grab messages from disk //grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
@ -1339,7 +1341,11 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
// SOMEDAY: Only do this if an initial attempt to read from disk results in insufficient data and the required data is no longer in memory // SOMEDAY: Only do this if an initial attempt to read from disk results in insufficient data and the required data is no longer in memory
// SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the result? // SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the result?
peekMessagesFromMemory( logData, req, messages2, endVersion ); if (req.onlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory( logData, req, messages2, endVersion );
}
if (req.tag == txsTag) { if (req.tag == txsTag) {
Standalone<VectorRef<KeyValueRef>> kvs = wait( Standalone<VectorRef<KeyValueRef>> kvs = wait(
@ -1353,10 +1359,12 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
messages.serializeBytes(kv.value); messages.serializeBytes(kv.value);
} }
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1; endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
else onlySpilled = true;
} else {
messages.serializeBytes( messages2.toValue() ); messages.serializeBytes( messages2.toValue() );
}
} else { } else {
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow. // FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
Standalone<VectorRef<KeyValueRef>> kvrefs = wait( Standalone<VectorRef<KeyValueRef>> kvrefs = wait(
@ -1433,13 +1441,20 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
messageReads.clear(); messageReads.clear();
memoryReservation.release(); memoryReservation.release();
if (earlyEnd) if (earlyEnd) {
endVersion = lastRefMessageVersion + 1; endVersion = lastRefMessageVersion + 1;
else onlySpilled = true;
} else {
messages.serializeBytes( messages2.toValue() ); messages.serializeBytes( messages2.toValue() );
}
} }
} else { } else {
peekMessagesFromMemory( logData, req, messages, endVersion ); if (req.onlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory( logData, req, messages, endVersion );
}
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
} }
@ -1448,6 +1463,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion; reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
reply.messages = messages.toValue(); reply.messages = messages.toValue();
reply.end = endVersion; reply.end = endVersion;
reply.onlySpilled = onlySpilled;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()); //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());