Merge pull request #1584 from alexmiller-apple/spilled-only-peek

Save TLog resources by letting peek request only spilled data.
This commit is contained in:
Evan Tschannen 2019-06-20 18:22:31 -07:00 committed by GitHub
commit 1c005d5878
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 62 additions and 20 deletions

View File

@ -357,6 +357,7 @@ ACTOR Future<Void> logRouterPeekMessages( LogRouterData* self, TLogPeekRequest r
reply.messages = messages.toValue();
reply.popped = self->minPopped.get() >= self->startVersion ? self->minPopped.get() : 0;
reply.end = endVersion;
reply.onlySpilled = false;
req.reply.send( reply );
//TraceEvent("LogRouterPeek4", self->dbgid);

View File

@ -340,7 +340,7 @@ struct ILogSystem {
virtual void advanceTo(LogMessageVersion n) = 0;
//returns immediately if hasMessage() returns true.
//returns when either the result of hasMessage() or version() has changed.
//returns when either the result of hasMessage() or version() has changed, or a cursor has internally been exhausted.
virtual Future<Void> getMore(int taskID = TaskTLogPeekReply) = 0;
//returns when the failure monitor detects that the servers associated with the cursor are failed
@ -388,6 +388,7 @@ struct ILogSystem {
UID randomID;
bool returnIfBlocked;
bool onlySpilled;
bool parallelGetMore;
int sequence;
Deque<Future<TLogPeekReply>> futureResults;

View File

@ -25,14 +25,14 @@
#include "flow/actorcompiler.h" // has to be last include
ILogSystem::ServerPeekCursor::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore )
: interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(deterministicRandom()->randomUniqueID()), poppedVersion(0), returnIfBlocked(returnIfBlocked), sequence(0), parallelGetMore(parallelGetMore) {
: interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(deterministicRandom()->randomUniqueID()), poppedVersion(0), returnIfBlocked(returnIfBlocked), sequence(0), onlySpilled(false), parallelGetMore(parallelGetMore) {
this->results.maxKnownVersion = 0;
this->results.minKnownCommittedVersion = 0;
//TraceEvent("SPC_Starting", randomID).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).backtrace();
}
ILogSystem::ServerPeekCursor::ServerPeekCursor( TLogPeekReply const& results, LogMessageVersion const& messageVersion, LogMessageVersion const& end, int32_t messageLength, int32_t rawLength, bool hasMsg, Version poppedVersion, Tag tag )
: results(results), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(messageVersion), end(end), messageLength(messageLength), rawLength(rawLength), hasMsg(hasMsg), randomID(deterministicRandom()->randomUniqueID()), poppedVersion(poppedVersion), returnIfBlocked(false), sequence(0), parallelGetMore(false)
: results(results), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(messageVersion), end(end), messageLength(messageLength), rawLength(rawLength), hasMsg(hasMsg), randomID(deterministicRandom()->randomUniqueID()), poppedVersion(poppedVersion), returnIfBlocked(false), sequence(0), onlySpilled(false), parallelGetMore(false)
{
//TraceEvent("SPC_Clone", randomID);
this->results.maxKnownVersion = 0;
@ -146,8 +146,12 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
loop {
state Version expectedBegin = self->messageVersion.version;
try {
while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) {
self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, std::make_pair(self->randomID, self->sequence++)), taskID) ) );
if (self->parallelGetMore || self->onlySpilled) {
while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) {
self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled, std::make_pair(self->randomID, self->sequence++)), taskID) ) );
}
} else if (self->futureResults.size() == 0) {
return Void();
}
choose {
@ -158,6 +162,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
expectedBegin = res.end;
self->futureResults.pop_front();
self->results = res;
self->onlySpilled = res.onlySpilled;
if(res.popped.present())
self->poppedVersion = std::min( std::max(self->poppedVersion, res.popped.get()), self->end.version );
self->rd = ArenaReader( self->results.arena, self->results.messages, Unversioned() );
@ -172,6 +177,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
self->interfaceChanged = self->interf->onChange();
self->randomID = deterministicRandom()->randomUniqueID();
self->sequence = 0;
self->onlySpilled = false;
self->futureResults.clear();
}
}
@ -201,8 +207,9 @@ ACTOR Future<Void> serverPeekGetMore( ILogSystem::ServerPeekCursor* self, int ta
loop {
choose {
when( TLogPeekReply res = wait( self->interf->get().present() ?
brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked), taskID) ) : Never() ) ) {
brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled), taskID) ) : Never() ) ) {
self->results = res;
self->onlySpilled = res.onlySpilled;
if(res.popped.present())
self->poppedVersion = std::min( std::max(self->poppedVersion, res.popped.get()), self->end.version );
self->rd = ArenaReader( self->results.arena, self->results.messages, Unversioned() );
@ -213,7 +220,9 @@ ACTOR Future<Void> serverPeekGetMore( ILogSystem::ServerPeekCursor* self, int ta
//TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0);
return Void();
}
when( wait( self->interf->onChange() ) ) {}
when( wait( self->interf->onChange() ) ) {
self->onlySpilled = false;
}
}
}
} catch( Error &e ) {
@ -230,7 +239,11 @@ Future<Void> ILogSystem::ServerPeekCursor::getMore(int taskID) {
if( hasMessage() )
return Void();
if( !more.isValid() || more.isReady() ) {
more = parallelGetMore ? serverPeekParallelGetMore(this, taskID) : serverPeekGetMore(this, taskID);
if (parallelGetMore || onlySpilled || futureResults.size()) {
more = serverPeekParallelGetMore(this, taskID);
} else {
more = serverPeekGetMore(this, taskID);
}
}
return more;
}

View File

@ -1078,6 +1078,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
rep.popped = poppedVer;
rep.end = poppedVer;
rep.onlySpilled = false;
if(req.sequence.present()) {
auto& trackerData = self->peekTracker[peekId];
@ -1104,6 +1105,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
}
state Version endVersion = logData->version.get() + 1;
state bool onlySpilled = false;
//grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
@ -1113,7 +1115,11 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
// SOMEDAY: Only do this if an initial attempt to read from disk results in insufficient data and the required data is no longer in memory
// SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the result?
peekMessagesFromMemory( logData, req, messages2, endVersion );
if (req.onlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory( logData, req, messages2, endVersion );
}
Standalone<VectorRef<KeyValueRef>> kvs = wait(
self->persistentData->readRange(KeyRangeRef(
@ -1128,10 +1134,12 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
messages.serializeBytes(kv.value);
}
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES)
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
else
onlySpilled = true;
} else {
messages.serializeBytes( messages2.toValue() );
}
} else {
peekMessagesFromMemory( logData, req, messages, endVersion );
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
@ -1142,6 +1150,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
reply.messages = messages.toValue();
reply.end = endVersion;
reply.onlySpilled = onlySpilled;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address);

View File

@ -150,10 +150,11 @@ struct TLogPeekReply {
Version maxKnownVersion;
Version minKnownCommittedVersion;
Optional<Version> begin;
bool onlySpilled;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, arena, messages, end, popped, maxKnownVersion, minKnownCommittedVersion, begin);
serializer(ar, arena, messages, end, popped, maxKnownVersion, minKnownCommittedVersion, begin, onlySpilled);
}
};
@ -163,15 +164,16 @@ struct TLogPeekRequest {
Version begin;
Tag tag;
bool returnIfBlocked;
bool onlySpilled;
Optional<std::pair<UID, int>> sequence;
ReplyPromise<TLogPeekReply> reply;
TLogPeekRequest( Version begin, Tag tag, bool returnIfBlocked, Optional<std::pair<UID, int>> sequence = Optional<std::pair<UID, int>>() ) : begin(begin), tag(tag), returnIfBlocked(returnIfBlocked), sequence(sequence) {}
TLogPeekRequest( Version begin, Tag tag, bool returnIfBlocked, bool onlySpilled, Optional<std::pair<UID, int>> sequence = Optional<std::pair<UID, int>>() ) : begin(begin), tag(tag), returnIfBlocked(returnIfBlocked), sequence(sequence), onlySpilled(onlySpilled) {}
TLogPeekRequest() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, arena, begin, tag, returnIfBlocked, sequence, reply);
serializer(ar, arena, begin, tag, returnIfBlocked, onlySpilled, sequence, reply);
}
};

View File

@ -1365,6 +1365,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
rep.popped = poppedVer;
rep.end = poppedVer;
rep.onlySpilled = false;
if(req.sequence.present()) {
auto& trackerData = self->peekTracker[peekId];
@ -1391,6 +1392,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
}
state Version endVersion = logData->version.get() + 1;
state bool onlySpilled = false;
//grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
@ -1400,7 +1402,11 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
// SOMEDAY: Only do this if an initial attempt to read from disk results in insufficient data and the required data is no longer in memory
// SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the result?
peekMessagesFromMemory( logData, req, messages2, endVersion );
if (req.onlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory( logData, req, messages2, endVersion );
}
if (req.tag == txsTag) {
Standalone<VectorRef<KeyValueRef>> kvs = wait(
@ -1414,10 +1420,12 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
messages.serializeBytes(kv.value);
}
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES)
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
else
onlySpilled = true;
} else {
messages.serializeBytes( messages2.toValue() );
}
} else {
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
Standalone<VectorRef<KeyValueRef>> kvrefs = wait(
@ -1494,13 +1502,20 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
messageReads.clear();
memoryReservation.release();
if (earlyEnd)
if (earlyEnd) {
endVersion = lastRefMessageVersion + 1;
else
onlySpilled = true;
} else {
messages.serializeBytes( messages2.toValue() );
}
}
} else {
peekMessagesFromMemory( logData, req, messages, endVersion );
if (req.onlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory( logData, req, messages, endVersion );
}
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
}
@ -1509,6 +1524,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
reply.messages = messages.toValue();
reply.end = endVersion;
reply.onlySpilled = onlySpilled;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());