Fix clang format
This commit is contained in:
parent
8cc9a5af1a
commit
9e5f6d8214
|
@ -1910,124 +1910,126 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
|
|||
index++;
|
||||
}
|
||||
|
||||
messageReads.clear();
|
||||
memoryReservation.release();
|
||||
messageReads.clear();
|
||||
memoryReservation.release();
|
||||
|
||||
if (earlyEnd) {
|
||||
endVersion = lastRefMessageVersion + 1;
|
||||
onlySpilled = true;
|
||||
} else {
|
||||
messages.serializeBytes(messages2.toValue());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (reqOnlySpilled) {
|
||||
endVersion = logData->persistentDataDurableVersion + 1;
|
||||
if (earlyEnd) {
|
||||
endVersion = lastRefMessageVersion + 1;
|
||||
onlySpilled = true;
|
||||
} else {
|
||||
messages.serializeBytes(messages2.toValue());
|
||||
peekCount += memPeekCount;
|
||||
}
|
||||
|
||||
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
|
||||
}
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
if (reqOnlySpilled) {
|
||||
endVersion = logData->persistentDataDurableVersion + 1;
|
||||
} else {
|
||||
peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion, memPeekCount);
|
||||
messages.serializeBytes(messages2.toValue());
|
||||
peekCount += memPeekCount;
|
||||
}
|
||||
|
||||
state Version waitUntilVersion = logData->version.get() + 1;
|
||||
|
||||
// Currently, from `reqBegin` to logData->version are all empty peeks. Wait for more versions, or the empty
|
||||
// batching interval has expired.
|
||||
wait(logData->version.whenAtLeast(waitUntilVersion) ||
|
||||
delay(SERVER_KNOBS->PEEK_BATCHING_EMPTY_MSG_INTERVAL - (now() - blockStart)));
|
||||
if (logData->version.get() < waitUntilVersion) {
|
||||
break; // We know that from `reqBegin` to logData->version are all empty messages. Skip re-executing the
|
||||
// peek logic.
|
||||
}
|
||||
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
|
||||
}
|
||||
|
||||
TLogPeekReply reply;
|
||||
reply.maxKnownVersion = logData->version.get();
|
||||
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
|
||||
reply.messages = StringRef(reply.arena, messages.toValue());
|
||||
reply.end = endVersion;
|
||||
reply.onlySpilled = onlySpilled;
|
||||
|
||||
// Update metrics
|
||||
if (reply.messages.size() == 0) {
|
||||
++logData->emptyPeeks;
|
||||
}
|
||||
else {
|
||||
if (reqOnlySpilled) {
|
||||
endVersion = logData->persistentDataDurableVersion + 1;
|
||||
} else {
|
||||
++logData->nonEmptyPeeks;
|
||||
|
||||
// TODO (version vector) check if this should be included in "status details" json
|
||||
if (logData->peekVersionCounts.find(reqTag) == logData->peekVersionCounts.end()) {
|
||||
UID ssID = nondeterministicRandom()->randomUniqueID();
|
||||
std::string s = "PeekVersionCounts-" + reqTag.toString();
|
||||
logData->peekVersionCounts.try_emplace(
|
||||
reqTag, s, ssID, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE);
|
||||
}
|
||||
LatencySample& sample = logData->peekVersionCounts.at(reqTag);
|
||||
sample.addMeasurement(peekCount);
|
||||
peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion, memPeekCount);
|
||||
peekCount += memPeekCount;
|
||||
}
|
||||
|
||||
// TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("Tag", reqTag.toString()).
|
||||
// detail("BeginVer", reqBegin).detail("EndVer", reply.end).
|
||||
// detail("MsgBytes", reply.messages.expectedSize()).
|
||||
// detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress());
|
||||
state Version waitUntilVersion = logData->version.get() + 1;
|
||||
|
||||
if (reqSequence.present()) {
|
||||
auto& trackerData = logData->peekTracker[peekId];
|
||||
trackerData.lastUpdate = now();
|
||||
// Currently, from `reqBegin` to logData->version are all empty peeks. Wait for more versions, or the empty
|
||||
// batching interval has expired.
|
||||
wait(logData->version.whenAtLeast(waitUntilVersion) ||
|
||||
delay(SERVER_KNOBS->PEEK_BATCHING_EMPTY_MSG_INTERVAL - (now() - blockStart)));
|
||||
if (logData->version.get() < waitUntilVersion) {
|
||||
break; // We know that from `reqBegin` to logData->version are all empty messages. Skip re-executing the
|
||||
// peek logic.
|
||||
}
|
||||
}
|
||||
|
||||
double queueT = blockStart - queueStart;
|
||||
double blockT = workStart - blockStart;
|
||||
double workT = now() - workStart;
|
||||
TLogPeekReply reply;
|
||||
reply.maxKnownVersion = logData->version.get();
|
||||
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
|
||||
reply.messages = StringRef(reply.arena, messages.toValue());
|
||||
reply.end = endVersion;
|
||||
reply.onlySpilled = onlySpilled;
|
||||
|
||||
trackerData.totalPeeks++;
|
||||
trackerData.replyBytes += reply.messages.size();
|
||||
// Update metrics
|
||||
if (reply.messages.size() == 0) {
|
||||
++logData->emptyPeeks;
|
||||
} else {
|
||||
++logData->nonEmptyPeeks;
|
||||
|
||||
if (queueT > trackerData.queueMax)
|
||||
trackerData.queueMax = queueT;
|
||||
if (blockT > trackerData.blockMax)
|
||||
trackerData.blockMax = blockT;
|
||||
if (workT > trackerData.workMax)
|
||||
trackerData.workMax = workT;
|
||||
// TODO (version vector) check if this should be included in "status details" json
|
||||
if (logData->peekVersionCounts.find(reqTag) == logData->peekVersionCounts.end()) {
|
||||
UID ssID = nondeterministicRandom()->randomUniqueID();
|
||||
std::string s = "PeekVersionCounts-" + reqTag.toString();
|
||||
logData->peekVersionCounts.try_emplace(
|
||||
reqTag, s, ssID, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE);
|
||||
}
|
||||
LatencySample& sample = logData->peekVersionCounts.at(reqTag);
|
||||
sample.addMeasurement(peekCount);
|
||||
}
|
||||
|
||||
trackerData.queueTime += queueT;
|
||||
trackerData.blockTime += blockT;
|
||||
trackerData.workTime += workT;
|
||||
// TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("Tag", reqTag.toString()).
|
||||
// detail("BeginVer", reqBegin).detail("EndVer", reply.end).
|
||||
// detail("MsgBytes", reply.messages.expectedSize()).
|
||||
// detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress());
|
||||
|
||||
auto& sequenceData = trackerData.sequence_version[sequence + 1];
|
||||
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
|
||||
if (reqSequence.present()) {
|
||||
auto& trackerData = logData->peekTracker[peekId];
|
||||
trackerData.lastUpdate = now();
|
||||
|
||||
double queueT = blockStart - queueStart;
|
||||
double blockT = workStart - blockStart;
|
||||
double workT = now() - workStart;
|
||||
|
||||
trackerData.totalPeeks++;
|
||||
trackerData.replyBytes += reply.messages.size();
|
||||
|
||||
if (queueT > trackerData.queueMax)
|
||||
trackerData.queueMax = queueT;
|
||||
if (blockT > trackerData.blockMax)
|
||||
trackerData.blockMax = blockT;
|
||||
if (workT > trackerData.workMax)
|
||||
trackerData.workMax = workT;
|
||||
|
||||
trackerData.queueTime += queueT;
|
||||
trackerData.blockTime += blockT;
|
||||
trackerData.workTime += workT;
|
||||
|
||||
auto& sequenceData = trackerData.sequence_version[sequence + 1];
|
||||
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
|
||||
replyPromise.sendError(operation_obsolete());
|
||||
if (!sequenceData.isSet()) {
|
||||
// It would technically be more correct to .send({reqBegin, reqOnlySpilled}), as the next
|
||||
// request might still be in the window of active requests, but LogSystemPeekCursor will
|
||||
// throw away all future responses upon getting an operation_obsolete(), so computing a
|
||||
// response will probably be a waste of CPU.
|
||||
sequenceData.sendError(operation_obsolete());
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
if (sequenceData.isSet()) {
|
||||
trackerData.duplicatePeeks++;
|
||||
if (sequenceData.getFuture().get().first != reply.end) {
|
||||
TEST(true); // tlog peek second attempt ended at a different version (2)
|
||||
replyPromise.sendError(operation_obsolete());
|
||||
if (!sequenceData.isSet()) {
|
||||
// It would technically be more correct to .send({reqBegin, reqOnlySpilled}), as the next
|
||||
// request might still be in the window of active requests, but LogSystemPeekCursor will
|
||||
// throw away all future responses upon getting an operation_obsolete(), so computing a
|
||||
// response will probably be a waste of CPU.
|
||||
sequenceData.sendError(operation_obsolete());
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
if (sequenceData.isSet()) {
|
||||
trackerData.duplicatePeeks++;
|
||||
if (sequenceData.getFuture().get().first != reply.end) {
|
||||
TEST(true); // tlog peek second attempt ended at a different version (2)
|
||||
replyPromise.sendError(operation_obsolete());
|
||||
return Void();
|
||||
}
|
||||
} else {
|
||||
sequenceData.send(std::make_pair(reply.end, reply.onlySpilled));
|
||||
}
|
||||
reply.begin = reqBegin;
|
||||
} else {
|
||||
sequenceData.send(std::make_pair(reply.end, reply.onlySpilled));
|
||||
}
|
||||
reply.begin = reqBegin;
|
||||
}
|
||||
|
||||
replyPromise.send(reply);
|
||||
return Void();
|
||||
replyPromise.send(reply);
|
||||
return Void();
|
||||
}
|
||||
|
||||
// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover
|
||||
|
|
Loading…
Reference in New Issue