add TLog streaming peek to OldTLogServer_x_x

This commit is contained in:
Xiaoxi Wang 2021-07-21 13:23:16 -07:00
parent 1057835e8b
commit 68b08a3224
3 changed files with 478 additions and 1179 deletions

View File

@ -479,7 +479,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
});
specialCounter(
cc, "QueueDiskBytesTotal", [tLogData]() { return tLogData->rawPersistentQueue->getStorageBytes().total; });
specialCounter(cc, "ActivePeekStreams", [tLogData]() {return tLogData->activePeekStreams;});
specialCounter(cc, "ActivePeekStreams", [tLogData]() { return tLogData->activePeekStreams; });
}
~LogData() {
@ -1157,7 +1157,7 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
}
ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference<LogData> logData) {
/*try {
try {
TLogPeekReply reply =
wait(peekTLog(self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence));
req.reply.send(reply);
@ -1171,158 +1171,6 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
}
}
return Void();*/
state BinaryWriter messages(Unversioned());
state BinaryWriter messages2(Unversioned());
state int sequence = -1;
state UID peekId;
state OldTag oldTag = convertTag(req.tag);
if (req.sequence.present()) {
try {
peekId = req.sequence.get().first;
sequence = req.sequence.get().second;
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS &&
self->peekTracker.find(peekId) == self->peekTracker.end()) {
throw operation_obsolete();
}
if (sequence > 0) {
auto& trackerData = self->peekTracker[peekId];
trackerData.lastUpdate = now();
Version ver = wait(trackerData.sequence_version[sequence].getFuture());
req.begin = std::max(ver, req.begin);
wait(yield());
}
} catch (Error& e) {
if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else {
throw;
}
}
}
if (req.returnIfBlocked && logData->version.get() < req.begin) {
req.reply.sendError(end_of_stream());
return Void();
}
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
// Wait until we have something to return that the caller doesn't already have
if (logData->version.get() < req.begin) {
wait(logData->version.whenAtLeast(req.begin));
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
}
state Version endVersion = logData->version.get() + 1;
Version poppedVer = poppedVersion(logData, oldTag);
if (poppedVer > req.begin) {
TLogPeekReply rep;
rep.maxKnownVersion = logData->version.get();
rep.minKnownCommittedVersion = 0;
rep.popped = poppedVer;
rep.end = poppedVer;
rep.onlySpilled = false;
if (req.sequence.present()) {
auto& trackerData = self->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
trackerData.lastUpdate = now();
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
return Void();
}
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get() != rep.end) {
TEST(true); // tlog peek second attempt ended at a different version
req.reply.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(rep.end);
}
rep.begin = req.begin;
}
req.reply.send(rep);
return Void();
}
// grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
if (req.begin <= logData->persistentDataDurableVersion) {
// Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We
// may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if
// an initial attempt to read from disk results in insufficient data and the required data is no longer in
// memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the
// result?
peekMessagesFromMemory(logData, req.tag, req.begin, messages2, endVersion);
RangeResult kvs = wait(self->persistentData->readRange(
KeyRangeRef(persistTagMessagesKey(logData->logId, oldTag, req.begin),
persistTagMessagesKey(logData->logId, oldTag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->DESIRED_TOTAL_BYTES,
SERVER_KNOBS->DESIRED_TOTAL_BYTES));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
for (auto& kv : kvs) {
auto ver = decodeTagMessagesKey(kv.key);
messages << int32_t(-1) << ver;
BinaryReader rd(kv.value, Unversioned());
while (!rd.empty()) {
int32_t messageLength;
uint32_t subVersion;
rd >> messageLength >> subVersion;
messageLength += sizeof(uint16_t) + sizeof(Tag);
messages << messageLength << subVersion << uint16_t(1) << req.tag;
messageLength -= (sizeof(subVersion) + sizeof(uint16_t) + sizeof(Tag));
messages.serializeBytes(rd.readBytes(messageLength), messageLength);
}
}
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES)
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
else
messages.serializeBytes(messages2.toValue());
} else {
peekMessagesFromMemory(logData, req.tag, req.begin, messages, endVersion);
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
}
TLogPeekReply reply;
reply.maxKnownVersion = logData->version.get();
reply.minKnownCommittedVersion = 0;
reply.onlySpilled = false;
reply.messages = messages.toValue();
reply.end = endVersion;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());
if (req.sequence.present()) {
auto& trackerData = self->peekTracker[peekId];
trackerData.lastUpdate = now();
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get() != reply.end) {
TEST(true); // tlog peek second attempt ended at a different version (2)
req.reply.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(reply.end);
}
reply.begin = req.begin;
}
req.reply.send(reply);
return Void();
}

View File

@ -574,7 +574,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
});
specialCounter(
cc, "QueueDiskBytesTotal", [tLogData]() { return tLogData->rawPersistentQueue->getStorageBytes().total; });
specialCounter(cc, "ActivePeekStreams", [tLogData]() { return tLogData->activePeekStreams;});
specialCounter(cc, "ActivePeekStreams", [tLogData]() { return tLogData->activePeekStreams; });
}
~LogData() {
@ -1480,7 +1480,7 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
}
ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference<LogData> logData) {
/*try {
try {
TLogPeekReply reply =
wait(peekTLog(self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence));
req.reply.send(reply);
@ -1494,236 +1494,6 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
}
}
return Void();*/
state BinaryWriter messages(Unversioned());
state BinaryWriter messages2(Unversioned());
state int sequence = -1;
state UID peekId;
state double queueStart = now();
if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) {
req.tag.id = req.tag.id % logData->txsTags;
}
if (req.sequence.present()) {
try {
peekId = req.sequence.get().first;
sequence = req.sequence.get().second;
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS &&
logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
throw operation_obsolete();
}
auto& trackerData = logData->peekTracker[peekId];
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.tag = req.tag;
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
}
auto seqBegin = trackerData.sequence_version.begin();
while (trackerData.sequence_version.size() &&
seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
if (seqBegin->second.canBeSet()) {
seqBegin->second.sendError(operation_obsolete());
}
trackerData.sequence_version.erase(seqBegin);
seqBegin = trackerData.sequence_version.begin();
}
if (trackerData.sequence_version.size() && sequence < seqBegin->first) {
throw operation_obsolete();
}
Future<std::pair<Version, bool>> fPrevPeekData = trackerData.sequence_version[sequence].getFuture();
if (fPrevPeekData.isReady()) {
trackerData.unblockedPeeks++;
double t = now() - trackerData.lastUpdate;
if (t > trackerData.idleMax)
trackerData.idleMax = t;
trackerData.idleTime += t;
}
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(fPrevPeekData);
req.begin = std::max(prevPeekData.first, req.begin);
req.onlySpilled = prevPeekData.second;
wait(yield());
} catch (Error& e) {
if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else {
throw;
}
}
}
state double blockStart = now();
if (req.returnIfBlocked && logData->version.get() < req.begin) {
req.reply.sendError(end_of_stream());
if (req.sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(req.begin, req.onlySpilled));
}
}
return Void();
}
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
// Wait until we have something to return that the caller doesn't already have
if (logData->version.get() < req.begin) {
wait(logData->version.whenAtLeast(req.begin));
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
}
if (logData->locality != tagLocalitySatellite && req.tag.locality == tagLocalityLogRouter) {
wait(self->concurrentLogRouterReads.take());
state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads);
wait(delay(0.0, TaskPriority::Low));
}
if (req.begin <= logData->persistentDataDurableVersion && req.tag.locality != tagLocalityTxs && req.tag != txsTag) {
// Reading spilled data will almost always imply that the storage server is >5s behind the rest
// of the cluster. We shouldn't prioritize spending CPU on helping this server catch up
// slightly faster over keeping the rest of the cluster operating normally.
// txsTag is only ever peeked on recovery, and we would still wish to prioritize requests
// that impact recovery duration.
wait(delay(0, TaskPriority::TLogSpilledPeekReply));
}
state double workStart = now();
Version poppedVer = poppedVersion(logData, req.tag);
if (poppedVer > req.begin) {
TLogPeekReply rep;
rep.maxKnownVersion = logData->version.get();
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
rep.popped = poppedVer;
rep.end = poppedVer;
rep.onlySpilled = false;
if (req.sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
trackerData.lastUpdate = now();
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
return Void();
}
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get().first != rep.end) {
TEST(true); // tlog peek second attempt ended at a different version
req.reply.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(std::make_pair(rep.end, rep.onlySpilled));
}
rep.begin = req.begin;
}
req.reply.send(rep);
return Void();
}
state Version endVersion = logData->version.get() + 1;
state bool onlySpilled = false;
// grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
if (req.begin <= logData->persistentDataDurableVersion) {
// Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We
// may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if
// an initial attempt to read from disk results in insufficient data and the required data is no longer in
// memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the
// result?
if (req.onlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory(logData, req.tag, req.begin, messages2, endVersion);
}
RangeResult kvs = wait(self->persistentData->readRange(
KeyRangeRef(persistTagMessagesKey(logData->logId, req.tag, req.begin),
persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->DESIRED_TOTAL_BYTES,
SERVER_KNOBS->DESIRED_TOTAL_BYTES));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
for (auto& kv : kvs) {
auto ver = decodeTagMessagesKey(kv.key);
messages << VERSION_HEADER << ver;
messages.serializeBytes(kv.value);
}
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
onlySpilled = true;
} else {
messages.serializeBytes(messages2.toValue());
}
} else {
peekMessagesFromMemory(logData, req.tag, req.begin, messages, endVersion);
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
}
TLogPeekReply reply;
reply.maxKnownVersion = logData->version.get();
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
reply.messages = messages.toValue();
reply.end = endVersion;
reply.onlySpilled = onlySpilled;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address);
if (req.sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
double queueT = blockStart - queueStart;
double blockT = workStart - blockStart;
double workT = now() - workStart;
trackerData.totalPeeks++;
trackerData.replyBytes += reply.messages.size();
if (queueT > trackerData.queueMax)
trackerData.queueMax = queueT;
if (blockT > trackerData.blockMax)
trackerData.blockMax = blockT;
if (workT > trackerData.workMax)
trackerData.workMax = workT;
trackerData.queueTime += queueT;
trackerData.blockTime += blockT;
trackerData.workTime += workT;
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
return Void();
}
if (sequenceData.isSet()) {
trackerData.duplicatePeeks++;
if (sequenceData.getFuture().get().first != reply.end) {
TEST(true); // tlog peek second attempt ended at a different version (2)
req.reply.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(std::make_pair(reply.end, reply.onlySpilled));
}
reply.begin = req.begin;
}
req.reply.send(reply);
return Void();
}

View File

@ -663,7 +663,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
cc, "QueueDiskBytesTotal", [tLogData]() { return tLogData->rawPersistentQueue->getStorageBytes().total; });
specialCounter(cc, "PeekMemoryReserved", [tLogData]() { return tLogData->peekMemoryLimiter.activePermits(); });
specialCounter(cc, "PeekMemoryRequestsStalled", [tLogData]() { return tLogData->peekMemoryLimiter.waiters(); });
specialCounter(cc, "ActivePeekStreams", [tLogData]() {return tLogData->activePeekStreams;});
specialCounter(cc, "ActivePeekStreams", [tLogData]() { return tLogData->activePeekStreams; });
}
~LogData() {
@ -1443,9 +1443,11 @@ ACTOR Future<Void> tLogPopCore(TLogData* self, Tag inputTag, Version to, Referen
}
uint64_t PoppedVersionLag = logData->persistentDataDurableVersion - logData->queuePoppedVersion;
if ( SERVER_KNOBS->ENABLE_DETAILED_TLOG_POP_TRACE &&
(logData->queuePoppedVersion > 0) && //avoid generating massive events at beginning
(tagData->unpoppedRecovered || PoppedVersionLag >= SERVER_KNOBS->TLOG_POPPED_VER_LAG_THRESHOLD_FOR_TLOGPOP_TRACE)) { //when recovery or long lag
if (SERVER_KNOBS->ENABLE_DETAILED_TLOG_POP_TRACE &&
(logData->queuePoppedVersion > 0) && // avoid generating massive events at beginning
(tagData->unpoppedRecovered ||
PoppedVersionLag >=
SERVER_KNOBS->TLOG_POPPED_VER_LAG_THRESHOLD_FOR_TLOGPOP_TRACE)) { // when recovery or long lag
TraceEvent("TLogPopDetails", logData->logId)
.detail("Tag", tagData->tag.toString())
.detail("UpTo", upTo)
@ -1719,8 +1721,7 @@ ACTOR Future<TLogPeekReply> peekTLog(TLogData* self,
} else {
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
RangeResult kvrefs = wait(self->persistentData->readRange(
KeyRangeRef(
persistTagMessageRefsKey(logData->logId, tag, begin),
KeyRangeRef(persistTagMessageRefsKey(logData->logId, tag, begin),
persistTagMessageRefsKey(logData->logId, tag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1));
@ -1760,7 +1761,7 @@ ACTOR Future<TLogPeekReply> peekTLog(TLogData* self,
state std::vector<Future<Standalone<StringRef>>> messageReads;
messageReads.reserve(commitLocations.size());
for (const auto& pair : commitLocations) {
messageReads.push_back(self->rawPersistentQueue->read(pair.first, pair.second, CheckHashes::TRUE));
messageReads.push_back(self->rawPersistentQueue->read(pair.first, pair.second, CheckHashes::True));
}
commitLocations.clear();
wait(waitForAll(messageReads));
@ -1910,7 +1911,7 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
}
ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference<LogData> logData) {
/*try {
try {
TLogPeekReply reply =
wait(peekTLog(self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence));
req.reply.send(reply);
@ -1924,326 +1925,6 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
}
}
return Void();*/
state BinaryWriter messages(Unversioned());
state BinaryWriter messages2(Unversioned());
state int sequence = -1;
state UID peekId;
state double queueStart = now();
if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) {
req.tag.id = req.tag.id % logData->txsTags;
}
if (req.sequence.present()) {
try {
peekId = req.sequence.get().first;
sequence = req.sequence.get().second;
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS &&
logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
throw operation_obsolete();
}
auto& trackerData = logData->peekTracker[peekId];
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.tag = req.tag;
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
}
auto seqBegin = trackerData.sequence_version.begin();
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
while (trackerData.sequence_version.size() &&
seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
if (seqBegin->second.canBeSet()) {
seqBegin->second.sendError(operation_obsolete());
}
trackerData.sequence_version.erase(seqBegin);
seqBegin = trackerData.sequence_version.begin();
}
if (trackerData.sequence_version.size() && sequence < seqBegin->first) {
throw operation_obsolete();
}
Future<std::pair<Version, bool>> fPrevPeekData = trackerData.sequence_version[sequence].getFuture();
if (fPrevPeekData.isReady()) {
trackerData.unblockedPeeks++;
double t = now() - trackerData.lastUpdate;
if (t > trackerData.idleMax)
trackerData.idleMax = t;
trackerData.idleTime += t;
}
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(fPrevPeekData);
req.begin = std::max(prevPeekData.first, req.begin);
req.onlySpilled = prevPeekData.second;
wait(yield());
} catch (Error& e) {
if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else {
throw;
}
}
}
state double blockStart = now();
if (req.returnIfBlocked && logData->version.get() < req.begin) {
req.reply.sendError(end_of_stream());
if (req.sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(req.begin, req.onlySpilled));
}
}
return Void();
}
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
// Wait until we have something to return that the caller doesn't already have
if (logData->version.get() < req.begin) {
wait(logData->version.whenAtLeast(req.begin));
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
}
if (req.tag.locality == tagLocalityLogRouter) {
wait(self->concurrentLogRouterReads.take());
state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads);
wait(delay(0.0, TaskPriority::Low));
}
if (req.begin <= logData->persistentDataDurableVersion && req.tag.locality != tagLocalityTxs && req.tag != txsTag) {
// Reading spilled data will almost always imply that the storage server is >5s behind the rest
// of the cluster. We shouldn't prioritize spending CPU on helping this server catch up
// slightly faster over keeping the rest of the cluster operating normally.
// txsTag is only ever peeked on recovery, and we would still wish to prioritize requests
// that impact recovery duration.
wait(delay(0, TaskPriority::TLogSpilledPeekReply));
}
state double workStart = now();
Version poppedVer = poppedVersion(logData, req.tag);
if (poppedVer > req.begin) {
TLogPeekReply rep;
rep.maxKnownVersion = logData->version.get();
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
rep.popped = poppedVer;
rep.end = poppedVer;
rep.onlySpilled = false;
if (req.sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
trackerData.lastUpdate = now();
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
return Void();
}
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get().first != rep.end) {
TEST(true); // tlog peek second attempt ended at a different version
req.reply.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(std::make_pair(rep.end, rep.onlySpilled));
}
rep.begin = req.begin;
}
req.reply.send(rep);
return Void();
}
state Version endVersion = logData->version.get() + 1;
state bool onlySpilled = false;
// grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
if (req.begin <= logData->persistentDataDurableVersion) {
// Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We
// may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if
// an initial attempt to read from disk results in insufficient data and the required data is no longer in
// memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the
// result?
if (req.onlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory(logData, req.tag, req.begin, messages2, endVersion);
}
if (req.tag.locality == tagLocalityTxs || req.tag == txsTag) {
RangeResult kvs = wait(self->persistentData->readRange(
KeyRangeRef(persistTagMessagesKey(logData->logId, req.tag, req.begin),
persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->DESIRED_TOTAL_BYTES,
SERVER_KNOBS->DESIRED_TOTAL_BYTES));
for (auto& kv : kvs) {
auto ver = decodeTagMessagesKey(kv.key);
messages << VERSION_HEADER << ver;
messages.serializeBytes(kv.value);
}
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
onlySpilled = true;
} else {
messages.serializeBytes(messages2.toValue());
}
} else {
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
RangeResult kvrefs = wait(self->persistentData->readRange(
KeyRangeRef(
persistTagMessageRefsKey(logData->logId, req.tag, req.begin),
persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
state std::vector<std::pair<IDiskQueue::location, IDiskQueue::location>> commitLocations;
state bool earlyEnd = false;
uint32_t mutationBytes = 0;
state uint64_t commitBytes = 0;
state Version firstVersion = std::numeric_limits<Version>::max();
for (int i = 0; i < kvrefs.size() && i < SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK; i++) {
auto& kv = kvrefs[i];
VectorRef<SpilledData> spilledData;
BinaryReader r(kv.value, AssumeVersion(logData->protocolVersion));
r >> spilledData;
for (const SpilledData& sd : spilledData) {
if (mutationBytes >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
earlyEnd = true;
break;
}
if (sd.version >= req.begin) {
firstVersion = std::min(firstVersion, sd.version);
const IDiskQueue::location end = sd.start.lo + sd.length;
commitLocations.emplace_back(sd.start, end);
// This isn't perfect, because we aren't accounting for page boundaries, but should be
// close enough.
commitBytes += sd.length;
mutationBytes += sd.mutationBytes;
}
}
if (earlyEnd)
break;
}
earlyEnd = earlyEnd || (kvrefs.size() >= SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1);
wait(self->peekMemoryLimiter.take(TaskPriority::TLogSpilledPeekReply, commitBytes));
state FlowLock::Releaser memoryReservation(self->peekMemoryLimiter, commitBytes);
state std::vector<Future<Standalone<StringRef>>> messageReads;
messageReads.reserve(commitLocations.size());
for (const auto& pair : commitLocations) {
messageReads.push_back(self->rawPersistentQueue->read(pair.first, pair.second, CheckHashes::True));
}
commitLocations.clear();
wait(waitForAll(messageReads));
state Version lastRefMessageVersion = 0;
state int index = 0;
loop {
if (index >= messageReads.size())
break;
Standalone<StringRef> queueEntryData = messageReads[index].get();
uint8_t valid;
const uint32_t length = *(uint32_t*)queueEntryData.begin();
queueEntryData = queueEntryData.substr(4, queueEntryData.size() - 4);
BinaryReader rd(queueEntryData, IncludeVersion());
state TLogQueueEntry entry;
rd >> entry >> valid;
ASSERT(valid == 0x01);
ASSERT(length + sizeof(valid) == queueEntryData.size());
messages << VERSION_HEADER << entry.version;
std::vector<StringRef> rawMessages =
wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags));
for (const StringRef& msg : rawMessages) {
messages.serializeBytes(msg);
}
lastRefMessageVersion = entry.version;
index++;
}
messageReads.clear();
memoryReservation.release();
if (earlyEnd) {
endVersion = lastRefMessageVersion + 1;
onlySpilled = true;
} else {
messages.serializeBytes(messages2.toValue());
}
}
} else {
if (req.onlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory(logData, req.tag, req.begin, messages, endVersion);
}
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
}
TLogPeekReply reply;
reply.maxKnownVersion = logData->version.get();
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
reply.messages = messages.toValue();
reply.end = endVersion;
reply.onlySpilled = onlySpilled;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());
if (req.sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
double queueT = blockStart - queueStart;
double blockT = workStart - blockStart;
double workT = now() - workStart;
trackerData.totalPeeks++;
trackerData.replyBytes += reply.messages.size();
if (queueT > trackerData.queueMax)
trackerData.queueMax = queueT;
if (blockT > trackerData.blockMax)
trackerData.blockMax = blockT;
if (workT > trackerData.workMax)
trackerData.workMax = workT;
trackerData.queueTime += queueT;
trackerData.blockTime += blockT;
trackerData.workTime += workT;
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
return Void();
}
if (sequenceData.isSet()) {
trackerData.duplicatePeeks++;
if (sequenceData.getFuture().get().first != reply.end) {
TEST(true); // tlog peek second attempt ended at a different version (2)
req.reply.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(std::make_pair(reply.end, reply.onlySpilled));
}
reply.begin = req.begin;
}
req.reply.send(reply);
return Void();
}