refactor TLog Peek code
This commit is contained in:
parent
b50fda6b4b
commit
9948b9d4ef
|
@ -1507,15 +1507,16 @@ std::deque<std::pair<Version, LengthPrefixedStringRef>>& getVersionMessages(Refe
|
||||||
};
|
};
|
||||||
|
|
||||||
void peekMessagesFromMemory(Reference<LogData> self,
|
void peekMessagesFromMemory(Reference<LogData> self,
|
||||||
TLogPeekRequest const& req,
|
Tag tag,
|
||||||
|
Version begin,
|
||||||
BinaryWriter& messages,
|
BinaryWriter& messages,
|
||||||
Version& endVersion) {
|
Version& endVersion) {
|
||||||
ASSERT(!messages.getLength());
|
ASSERT(!messages.getLength());
|
||||||
|
|
||||||
auto& deque = getVersionMessages(self, req.tag);
|
auto& deque = getVersionMessages(self, tag);
|
||||||
//TraceEvent("TLogPeekMem", self->dbgid).detail("Tag", req.tag1).detail("PDS", self->persistentDataSequence).detail("PDDS", self->persistentDataDurableSequence).detail("Oldest", map1.empty() ? 0 : map1.begin()->key ).detail("OldestMsgCount", map1.empty() ? 0 : map1.begin()->value.size());
|
//TraceEvent("TLogPeekMem", self->dbgid).detail("Tag", req.tag1).detail("PDS", self->persistentDataSequence).detail("PDDS", self->persistentDataDurableSequence).detail("Oldest", map1.empty() ? 0 : map1.begin()->key ).detail("OldestMsgCount", map1.empty() ? 0 : map1.begin()->value.size());
|
||||||
|
|
||||||
Version begin = std::max(req.begin, self->persistentDataDurableVersion + 1);
|
begin = std::max(begin, self->persistentDataDurableVersion + 1);
|
||||||
auto it = std::lower_bound(deque.begin(),
|
auto it = std::lower_bound(deque.begin(),
|
||||||
deque.end(),
|
deque.end(),
|
||||||
std::make_pair(begin, LengthPrefixedStringRef()),
|
std::make_pair(begin, LengthPrefixedStringRef()),
|
||||||
|
@ -1542,7 +1543,7 @@ void peekMessagesFromMemory(Reference<LogData> self,
|
||||||
DEBUG_TAGS_AND_MESSAGE(
|
DEBUG_TAGS_AND_MESSAGE(
|
||||||
"TLogPeek", currentVersion, StringRef((uint8_t*)data + offset, messages.getLength() - offset))
|
"TLogPeek", currentVersion, StringRef((uint8_t*)data + offset, messages.getLength() - offset))
|
||||||
.detail("LogId", self->logId)
|
.detail("LogId", self->logId)
|
||||||
.detail("PeekTag", req.tag);
|
.detail("PeekTag", tag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1568,105 +1569,97 @@ ACTOR Future<std::vector<StringRef>> parseMessagesForTag(StringRef commitBlob, T
|
||||||
return relevantMessages;
|
return relevantMessages;
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) {
|
// Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request
|
||||||
if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) {
|
ACTOR Future<TLogPeekReply> peekTLog(
|
||||||
req.tag.id = req.tag.id % logData->txsTags;
|
TLogData* self,
|
||||||
}
|
Reference<LogData> logData,
|
||||||
req.reply.setByteLimit(SERVER_KNOBS->MAXIMUM_PEEK_BYTES);
|
Version begin,
|
||||||
return Void();
|
Tag tag,
|
||||||
}
|
bool returnIfBlocked = false,
|
||||||
|
bool reqOnlySpilled = false,
|
||||||
ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference<LogData> logData) {
|
Optional<std::pair<UID, int>> sequence = Optional<std::pair<UID, int>>()) {
|
||||||
state BinaryWriter messages(Unversioned());
|
state BinaryWriter messages(Unversioned());
|
||||||
state BinaryWriter messages2(Unversioned());
|
state BinaryWriter messages2(Unversioned());
|
||||||
state int sequence = -1;
|
state int sequenceNum = -1;
|
||||||
state UID peekId;
|
state UID peekId;
|
||||||
state double queueStart = now();
|
state double queueStart = now();
|
||||||
|
|
||||||
if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) {
|
if (tag.locality == tagLocalityTxs && tag.id >= logData->txsTags && logData->txsTags > 0) {
|
||||||
req.tag.id = req.tag.id % logData->txsTags;
|
tag.id = tag.id % logData->txsTags;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (req.sequence.present()) {
|
// TODO: once the fake stream is replaced by ReplyPromiseStream, we can remove the code handling sequence requests
|
||||||
try {
|
// STEP: a. mark obsolete sequence requests; b. wait previous sequence requests are handled in order
|
||||||
peekId = req.sequence.get().first;
|
if (sequence.present()) {
|
||||||
sequence = req.sequence.get().second;
|
peekId = sequence.get().first;
|
||||||
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS &&
|
sequenceNum = sequence.get().second;
|
||||||
logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
|
if (sequenceNum >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS &&
|
||||||
throw operation_obsolete();
|
logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
|
||||||
}
|
throw operation_obsolete();
|
||||||
auto& trackerData = logData->peekTracker[peekId];
|
|
||||||
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
|
|
||||||
trackerData.tag = req.tag;
|
|
||||||
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
|
|
||||||
}
|
|
||||||
auto seqBegin = trackerData.sequence_version.begin();
|
|
||||||
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
|
|
||||||
while (trackerData.sequence_version.size() &&
|
|
||||||
seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
|
|
||||||
if (seqBegin->second.canBeSet()) {
|
|
||||||
seqBegin->second.sendError(operation_obsolete());
|
|
||||||
}
|
|
||||||
trackerData.sequence_version.erase(seqBegin);
|
|
||||||
seqBegin = trackerData.sequence_version.begin();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (trackerData.sequence_version.size() && sequence < seqBegin->first) {
|
|
||||||
throw operation_obsolete();
|
|
||||||
}
|
|
||||||
|
|
||||||
Future<std::pair<Version, bool>> fPrevPeekData = trackerData.sequence_version[sequence].getFuture();
|
|
||||||
if (fPrevPeekData.isReady()) {
|
|
||||||
trackerData.unblockedPeeks++;
|
|
||||||
double t = now() - trackerData.lastUpdate;
|
|
||||||
if (t > trackerData.idleMax)
|
|
||||||
trackerData.idleMax = t;
|
|
||||||
trackerData.idleTime += t;
|
|
||||||
}
|
|
||||||
trackerData.lastUpdate = now();
|
|
||||||
std::pair<Version, bool> prevPeekData = wait(fPrevPeekData);
|
|
||||||
req.begin = std::max(prevPeekData.first, req.begin);
|
|
||||||
req.onlySpilled = prevPeekData.second;
|
|
||||||
wait(yield());
|
|
||||||
} catch (Error& e) {
|
|
||||||
if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
|
|
||||||
req.reply.sendError(e);
|
|
||||||
return Void();
|
|
||||||
} else {
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
auto& trackerData = logData->peekTracker[peekId];
|
||||||
|
if (sequenceNum == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
|
||||||
|
trackerData.tag = tag;
|
||||||
|
trackerData.sequence_version[0].send(std::make_pair(begin, reqOnlySpilled));
|
||||||
|
}
|
||||||
|
auto seqBegin = trackerData.sequence_version.begin();
|
||||||
|
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
|
||||||
|
while (trackerData.sequence_version.size() &&
|
||||||
|
seqBegin->first <= sequenceNum - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
|
||||||
|
if (seqBegin->second.canBeSet()) {
|
||||||
|
seqBegin->second.sendError(operation_obsolete());
|
||||||
|
}
|
||||||
|
trackerData.sequence_version.erase(seqBegin);
|
||||||
|
seqBegin = trackerData.sequence_version.begin();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (trackerData.sequence_version.size() && sequenceNum < seqBegin->first) {
|
||||||
|
throw operation_obsolete();
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<std::pair<Version, bool>> fPrevPeekData = trackerData.sequence_version[sequenceNum].getFuture();
|
||||||
|
if (fPrevPeekData.isReady()) {
|
||||||
|
trackerData.unblockedPeeks++;
|
||||||
|
double t = now() - trackerData.lastUpdate;
|
||||||
|
if (t > trackerData.idleMax)
|
||||||
|
trackerData.idleMax = t;
|
||||||
|
trackerData.idleTime += t;
|
||||||
|
}
|
||||||
|
trackerData.lastUpdate = now();
|
||||||
|
std::pair<Version, bool> prevPeekData = wait(fPrevPeekData);
|
||||||
|
begin = std::max(prevPeekData.first, begin);
|
||||||
|
reqOnlySpilled = prevPeekData.second;
|
||||||
|
wait(yield());
|
||||||
}
|
}
|
||||||
|
|
||||||
state double blockStart = now();
|
state double blockStart = now();
|
||||||
|
|
||||||
if (req.returnIfBlocked && logData->version.get() < req.begin) {
|
if (returnIfBlocked && logData->version.get() < begin) {
|
||||||
req.reply.sendError(end_of_stream());
|
if (sequence.present()) {
|
||||||
if (req.sequence.present()) {
|
|
||||||
auto& trackerData = logData->peekTracker[peekId];
|
auto& trackerData = logData->peekTracker[peekId];
|
||||||
auto& sequenceData = trackerData.sequence_version[sequence + 1];
|
auto& sequenceData = trackerData.sequence_version[sequenceNum + 1];
|
||||||
trackerData.lastUpdate = now();
|
trackerData.lastUpdate = now();
|
||||||
if (!sequenceData.isSet()) {
|
if (!sequenceData.isSet()) {
|
||||||
sequenceData.send(std::make_pair(req.begin, req.onlySpilled));
|
sequenceData.send(std::make_pair(begin, reqOnlySpilled));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Void();
|
throw end_of_stream();
|
||||||
}
|
}
|
||||||
|
|
||||||
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
|
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
|
||||||
// Wait until we have something to return that the caller doesn't already have
|
// Wait until we have something to return that the caller doesn't already have
|
||||||
if (logData->version.get() < req.begin) {
|
if (logData->version.get() < begin) {
|
||||||
wait(logData->version.whenAtLeast(req.begin));
|
wait(logData->version.whenAtLeast(begin));
|
||||||
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
|
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (logData->locality != tagLocalitySatellite && req.tag.locality == tagLocalityLogRouter) {
|
if (logData->locality != tagLocalitySatellite && tag.locality == tagLocalityLogRouter) {
|
||||||
wait(self->concurrentLogRouterReads.take());
|
wait(self->concurrentLogRouterReads.take());
|
||||||
state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads);
|
state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads);
|
||||||
wait(delay(0.0, TaskPriority::Low));
|
wait(delay(0.0, TaskPriority::Low));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (req.begin <= logData->persistentDataDurableVersion && req.tag.locality != tagLocalityTxs && req.tag != txsTag) {
|
if (begin <= logData->persistentDataDurableVersion && tag.locality != tagLocalityTxs && tag != txsTag) {
|
||||||
// Reading spilled data will almost always imply that the storage server is >5s behind the rest
|
// Reading spilled data will almost always imply that the storage server is >5s behind the rest
|
||||||
// of the cluster. We shouldn't prioritize spending CPU on helping this server catch up
|
// of the cluster. We shouldn't prioritize spending CPU on helping this server catch up
|
||||||
// slightly faster over keeping the rest of the cluster operating normally.
|
// slightly faster over keeping the rest of the cluster operating normally.
|
||||||
|
@ -1677,8 +1670,9 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
|
||||||
|
|
||||||
state double workStart = now();
|
state double workStart = now();
|
||||||
|
|
||||||
Version poppedVer = poppedVersion(logData, req.tag);
|
Version poppedVer = poppedVersion(logData, tag);
|
||||||
if (poppedVer > req.begin) {
|
if (poppedVer > begin) {
|
||||||
|
// reply with an empty message and let the next reply start from poppedVer
|
||||||
TLogPeekReply rep;
|
TLogPeekReply rep;
|
||||||
rep.maxKnownVersion = logData->version.get();
|
rep.maxKnownVersion = logData->version.get();
|
||||||
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
|
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
|
||||||
|
@ -1686,30 +1680,28 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
|
||||||
rep.end = poppedVer;
|
rep.end = poppedVer;
|
||||||
rep.onlySpilled = false;
|
rep.onlySpilled = false;
|
||||||
|
|
||||||
if (req.sequence.present()) {
|
// TODO: once the fake stream is replaced by ReplyPromiseStream, we can remove the code handling sequence requests.
|
||||||
|
if (sequence.present()) {
|
||||||
auto& trackerData = logData->peekTracker[peekId];
|
auto& trackerData = logData->peekTracker[peekId];
|
||||||
auto& sequenceData = trackerData.sequence_version[sequence + 1];
|
auto& sequenceData = trackerData.sequence_version[sequenceNum + 1];
|
||||||
trackerData.lastUpdate = now();
|
trackerData.lastUpdate = now();
|
||||||
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
|
if (trackerData.sequence_version.size() && sequenceNum + 1 < trackerData.sequence_version.begin()->first) {
|
||||||
req.reply.sendError(operation_obsolete());
|
|
||||||
if (!sequenceData.isSet())
|
if (!sequenceData.isSet())
|
||||||
sequenceData.sendError(operation_obsolete());
|
sequenceData.sendError(operation_obsolete());
|
||||||
return Void();
|
throw operation_obsolete();
|
||||||
}
|
}
|
||||||
if (sequenceData.isSet()) {
|
if (sequenceData.isSet()) {
|
||||||
if (sequenceData.getFuture().get().first != rep.end) {
|
if (sequenceData.getFuture().get().first != rep.end) {
|
||||||
TEST(true); // tlog peek second attempt ended at a different version
|
TEST(true); // 1 tlog peek second attempt ended at a different version
|
||||||
req.reply.sendError(operation_obsolete());
|
throw operation_obsolete();
|
||||||
return Void();
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
sequenceData.send(std::make_pair(rep.end, rep.onlySpilled));
|
sequenceData.send(std::make_pair(rep.end, rep.onlySpilled));
|
||||||
}
|
}
|
||||||
rep.begin = req.begin;
|
rep.begin = begin;
|
||||||
}
|
}
|
||||||
|
|
||||||
req.reply.send(rep);
|
return rep;
|
||||||
return Void();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
state Version endVersion = logData->version.get() + 1;
|
state Version endVersion = logData->version.get() + 1;
|
||||||
|
@ -1717,23 +1709,23 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
|
||||||
|
|
||||||
// grab messages from disk
|
// grab messages from disk
|
||||||
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
|
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
|
||||||
if (req.begin <= logData->persistentDataDurableVersion) {
|
if (begin <= logData->persistentDataDurableVersion) {
|
||||||
// Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We
|
// Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We
|
||||||
// may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if
|
// may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if
|
||||||
// an initial attempt to read from disk results in insufficient data and the required data is no longer in
|
// an initial attempt to read from disk results in insufficient data and the required data is no longer in
|
||||||
// memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the
|
// memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the
|
||||||
// result?
|
// result?
|
||||||
|
|
||||||
if (req.onlySpilled) {
|
if (reqOnlySpilled) {
|
||||||
endVersion = logData->persistentDataDurableVersion + 1;
|
endVersion = logData->persistentDataDurableVersion + 1;
|
||||||
} else {
|
} else {
|
||||||
peekMessagesFromMemory(logData, req, messages2, endVersion);
|
peekMessagesFromMemory(logData, tag, begin, messages2, endVersion);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (logData->shouldSpillByValue(req.tag)) {
|
if (logData->shouldSpillByValue(tag)) {
|
||||||
RangeResult kvs = wait(self->persistentData->readRange(
|
RangeResult kvs = wait(self->persistentData->readRange(
|
||||||
KeyRangeRef(persistTagMessagesKey(logData->logId, req.tag, req.begin),
|
KeyRangeRef(persistTagMessagesKey(logData->logId, tag, begin),
|
||||||
persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)),
|
persistTagMessagesKey(logData->logId, tag, logData->persistentDataDurableVersion + 1)),
|
||||||
SERVER_KNOBS->DESIRED_TOTAL_BYTES,
|
SERVER_KNOBS->DESIRED_TOTAL_BYTES,
|
||||||
SERVER_KNOBS->DESIRED_TOTAL_BYTES));
|
SERVER_KNOBS->DESIRED_TOTAL_BYTES));
|
||||||
|
|
||||||
|
@ -1752,9 +1744,8 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
|
||||||
} else {
|
} else {
|
||||||
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
|
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
|
||||||
RangeResult kvrefs = wait(self->persistentData->readRange(
|
RangeResult kvrefs = wait(self->persistentData->readRange(
|
||||||
KeyRangeRef(
|
KeyRangeRef(persistTagMessageRefsKey(logData->logId, tag, begin),
|
||||||
persistTagMessageRefsKey(logData->logId, req.tag, req.begin),
|
persistTagMessageRefsKey(logData->logId, tag, logData->persistentDataDurableVersion + 1)),
|
||||||
persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)),
|
|
||||||
SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1));
|
SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1));
|
||||||
|
|
||||||
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
|
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
|
||||||
|
@ -1774,7 +1765,7 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
|
||||||
earlyEnd = true;
|
earlyEnd = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (sd.version >= req.begin) {
|
if (sd.version >= begin) {
|
||||||
firstVersion = std::min(firstVersion, sd.version);
|
firstVersion = std::min(firstVersion, sd.version);
|
||||||
const IDiskQueue::location end = sd.start.lo + sd.length;
|
const IDiskQueue::location end = sd.start.lo + sd.length;
|
||||||
commitLocations.emplace_back(sd.start, end);
|
commitLocations.emplace_back(sd.start, end);
|
||||||
|
@ -1816,13 +1807,13 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
|
||||||
messages << VERSION_HEADER << entry.version;
|
messages << VERSION_HEADER << entry.version;
|
||||||
|
|
||||||
std::vector<StringRef> rawMessages =
|
std::vector<StringRef> rawMessages =
|
||||||
wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags));
|
wait(parseMessagesForTag(entry.messages, tag, logData->logRouterTags));
|
||||||
for (const StringRef& msg : rawMessages) {
|
for (const StringRef& msg : rawMessages) {
|
||||||
messages.serializeBytes(msg);
|
messages.serializeBytes(msg);
|
||||||
DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg)
|
DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg)
|
||||||
.detail("UID", self->dbgid)
|
.detail("UID", self->dbgid)
|
||||||
.detail("LogId", logData->logId)
|
.detail("LogId", logData->logId)
|
||||||
.detail("PeekTag", req.tag);
|
.detail("PeekTag", tag);
|
||||||
}
|
}
|
||||||
|
|
||||||
lastRefMessageVersion = entry.version;
|
lastRefMessageVersion = entry.version;
|
||||||
|
@ -1840,10 +1831,10 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (req.onlySpilled) {
|
if (reqOnlySpilled) {
|
||||||
endVersion = logData->persistentDataDurableVersion + 1;
|
endVersion = logData->persistentDataDurableVersion + 1;
|
||||||
} else {
|
} else {
|
||||||
peekMessagesFromMemory(logData, req, messages, endVersion);
|
peekMessagesFromMemory(logData, tag, begin, messages, endVersion);
|
||||||
}
|
}
|
||||||
|
|
||||||
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
|
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
|
||||||
|
@ -1852,7 +1843,7 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
|
||||||
TLogPeekReply reply;
|
TLogPeekReply reply;
|
||||||
reply.maxKnownVersion = logData->version.get();
|
reply.maxKnownVersion = logData->version.get();
|
||||||
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
|
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
|
||||||
reply.messages = messages.toValue();
|
reply.messages = StringRef(reply.arena, messages.toValue());
|
||||||
reply.end = endVersion;
|
reply.end = endVersion;
|
||||||
reply.onlySpilled = onlySpilled;
|
reply.onlySpilled = onlySpilled;
|
||||||
|
|
||||||
|
@ -1861,7 +1852,7 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
|
||||||
// detail("MsgBytes", reply.messages.expectedSize()).
|
// detail("MsgBytes", reply.messages.expectedSize()).
|
||||||
// detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());
|
// detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());
|
||||||
|
|
||||||
if (req.sequence.present()) {
|
if (sequence.present()) {
|
||||||
auto& trackerData = logData->peekTracker[peekId];
|
auto& trackerData = logData->peekTracker[peekId];
|
||||||
trackerData.lastUpdate = now();
|
trackerData.lastUpdate = now();
|
||||||
|
|
||||||
|
@ -1883,9 +1874,8 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
|
||||||
trackerData.blockTime += blockT;
|
trackerData.blockTime += blockT;
|
||||||
trackerData.workTime += workT;
|
trackerData.workTime += workT;
|
||||||
|
|
||||||
auto& sequenceData = trackerData.sequence_version[sequence + 1];
|
auto& sequenceData = trackerData.sequence_version[sequenceNum + 1];
|
||||||
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
|
if (trackerData.sequence_version.size() && sequenceNum + 1 < trackerData.sequence_version.begin()->first) {
|
||||||
req.reply.sendError(operation_obsolete());
|
|
||||||
if (!sequenceData.isSet()) {
|
if (!sequenceData.isSet()) {
|
||||||
// It would technically be more correct to .send({req.begin, req.onlySpilled}), as the next
|
// It would technically be more correct to .send({req.begin, req.onlySpilled}), as the next
|
||||||
// request might still be in the window of active requests, but LogSystemPeekCursor will
|
// request might still be in the window of active requests, but LogSystemPeekCursor will
|
||||||
|
@ -1893,22 +1883,53 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
|
||||||
// response will probably be a waste of CPU.
|
// response will probably be a waste of CPU.
|
||||||
sequenceData.sendError(operation_obsolete());
|
sequenceData.sendError(operation_obsolete());
|
||||||
}
|
}
|
||||||
return Void();
|
throw operation_obsolete();
|
||||||
}
|
}
|
||||||
if (sequenceData.isSet()) {
|
if (sequenceData.isSet()) {
|
||||||
trackerData.duplicatePeeks++;
|
trackerData.duplicatePeeks++;
|
||||||
if (sequenceData.getFuture().get().first != reply.end) {
|
if (sequenceData.getFuture().get().first != reply.end) {
|
||||||
TEST(true); // tlog peek second attempt ended at a different version (2)
|
TEST(true); // 1 tlog peek second attempt ended at a different version (2)
|
||||||
req.reply.sendError(operation_obsolete());
|
throw operation_obsolete();
|
||||||
return Void();
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
sequenceData.send(std::make_pair(reply.end, reply.onlySpilled));
|
sequenceData.send(std::make_pair(reply.end, reply.onlySpilled));
|
||||||
}
|
}
|
||||||
reply.begin = req.begin;
|
reply.begin = begin;
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover
|
||||||
|
ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) {
|
||||||
|
if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) {
|
||||||
|
req.tag.id = req.tag.id % logData->txsTags;
|
||||||
|
}
|
||||||
|
req.reply.setByteLimit(SERVER_KNOBS->MAXIMUM_PEEK_BYTES);
|
||||||
|
// loop { wait(req.reply.onReady()); };
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference<LogData> logData) {
|
||||||
|
state BinaryWriter messages(Unversioned());
|
||||||
|
state BinaryWriter messages2(Unversioned());
|
||||||
|
state int sequence = -1;
|
||||||
|
state UID peekId;
|
||||||
|
state double queueStart = now();
|
||||||
|
|
||||||
|
try {
|
||||||
|
TLogPeekReply reply = wait(peekTLog(self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence));
|
||||||
|
req.reply.send(reply);
|
||||||
|
} catch (Error& e) {
|
||||||
|
if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete ||
|
||||||
|
e.code() == error_code_end_of_stream) {
|
||||||
|
req.reply.sendError(e);
|
||||||
|
return Void();
|
||||||
|
} else {
|
||||||
|
throw;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
req.reply.send(reply);
|
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue