clean 100k simulation test. revert changes of fdbrpc.h

This commit is contained in:
Xiaoxi Wang 2021-07-31 09:07:53 -07:00
parent 517ff9801d
commit 2a88033800
7 changed files with 376 additions and 1406 deletions

View File

@ -328,16 +328,14 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
NetNotifiedQueueWithAcknowledgements(int futures, int promises, const Endpoint& remoteEndpoint)
: NotifiedQueue<T>(futures, promises), FlowReceiver(remoteEndpoint, true) {
// A ReplyPromiseStream will be terminated on the server side if the network connection with the client breaks
acknowledgements.failures =
tagError<Void>(makeDependent<T>(IFailureMonitor::failureMonitor()).onDisconnectOrFailure(remoteEndpoint),
acknowledgements.failures = tagError<Void>(
makeDependent<T>(IFailureMonitor::failureMonitor()).onDisconnect(remoteEndpoint.getPrimaryAddress()),
operation_obsolete());
}
void destroy() override { delete this; }
void receive(ArenaObjectReader& reader) override {
this->addPromiseRef();
// TraceEvent(SevDebug, "NetNotifiedQueueWithAcknowledgementsReceive")
// .detail("PromiseRef", this->getPromiseReferenceCount());
ErrorOr<EnsureTable<T>> message;
reader.deserialize(message);
@ -371,8 +369,6 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
this->send(std::move(message.get().asUnderlyingType()));
}
this->delPromiseRef();
// TraceEvent(SevDebug, "NetNotifiedQueueWithAcknowledgementsReceiveEnd")
// .detail("PromiseRef", this->getPromiseReferenceCount());
}
T pop() override {
@ -698,19 +694,21 @@ public:
template <class X>
ReplyPromiseStream<REPLYSTREAM_TYPE(X)> getReplyStream(const X& value) const {
auto p = getReplyPromiseStream(value);
if (queue->isRemoteEndpoint()) {
Future<Void> disc =
makeDependent<T>(IFailureMonitor::failureMonitor()).onDisconnectOrFailure(getEndpoint());
auto& p = getReplyPromiseStream(value);
Reference<Peer> peer =
FlowTransport::transport().sendUnreliable(SerializeSource<T>(value), getEndpoint(), true);
// FIXME: defer sending the message until we know the connection is established
endStreamOnDisconnect(disc, p, getEndpoint(), peer);
return p;
} else {
send(value);
}
auto& p = getReplyPromiseStream(value);
return p;
}
}
// stream.getReplyUnlessFailedFor( request, double sustainedFailureDuration, double sustainedFailureSlope )
// Reliable at least once delivery: Like getReply, delivers request at least once and returns one of the replies.

View File

@ -815,8 +815,7 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
// Wait for a durable quorum of servers in destServers to have keys available (readWrite)
// They must also have at least the transaction read version so they can't "forget" the shard
// between
// now and when this transaction commits.
// between now and when this transaction commits.
state vector<Future<Void>> serverReady; // only for count below
state vector<Future<Void>> tssReady; // for waiting in parallel with tss
state vector<StorageServerInterface> tssReadyInterfs;

View File

@ -975,51 +975,62 @@ void peekMessagesFromMemory(Reference<LogData> self,
}
// Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request
ACTOR Future<TLogPeekReply> peekTLog(TLogData* self,
ACTOR template <typename PromiseType>
Future<Void> tLogPeekMessages(PromiseType replyPromise,
TLogData* self,
Reference<LogData> logData,
Version begin,
Tag tag,
bool returnIfBlocked = false,
Version reqBegin,
Tag reqTag,
bool reqReturnIfBlocked = false,
bool reqOnlySpilled = false,
Optional<std::pair<UID, int>> sequence = Optional<std::pair<UID, int>>()) {
Optional<std::pair<UID, int>> reqSequence = Optional<std::pair<UID, int>>()) {
state BinaryWriter messages(Unversioned());
state BinaryWriter messages2(Unversioned());
state int sequenceNum = -1;
state int sequence = -1;
state UID peekId;
state double queueStart = now();
state OldTag oldTag = convertTag(tag);
state OldTag oldTag = convertTag(reqTag);
if (sequence.present()) {
peekId = sequence.get().first;
sequenceNum = sequence.get().second;
if (sequenceNum >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS &&
if (reqSequence.present()) {
try {
peekId = reqSequence.get().first;
sequence = reqSequence.get().second;
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS &&
self->peekTracker.find(peekId) == self->peekTracker.end()) {
throw operation_obsolete();
}
if (sequenceNum > 0) {
if (sequence > 0) {
auto& trackerData = self->peekTracker[peekId];
trackerData.lastUpdate = now();
Version ver = wait(trackerData.sequence_version[sequenceNum].getFuture());
begin = std::max(ver, begin);
Version ver = wait(trackerData.sequence_version[sequence].getFuture());
reqBegin = std::max(ver, reqBegin);
wait(yield());
}
} catch (Error& e) {
if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
replyPromise.sendError(e);
return Void();
} else {
throw;
}
}
}
if (returnIfBlocked && logData->version.get() < begin) {
throw end_of_stream();
if (reqReturnIfBlocked && logData->version.get() < reqBegin) {
replyPromise.sendError(end_of_stream());
return Void();
}
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2);
// Wait until we have something to return that the caller doesn't already have
if (logData->version.get() < begin) {
wait(logData->version.whenAtLeast(begin));
if (logData->version.get() < reqBegin) {
wait(logData->version.whenAtLeast(reqBegin));
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
}
state Version endVersion = logData->version.get() + 1;
Version poppedVer = poppedVersion(logData, oldTag);
if (poppedVer > begin) {
if (poppedVer > reqBegin) {
TLogPeekReply rep;
rep.maxKnownVersion = logData->version.get();
rep.minKnownCommittedVersion = 0;
@ -1027,47 +1038,50 @@ ACTOR Future<TLogPeekReply> peekTLog(TLogData* self,
rep.end = poppedVer;
rep.onlySpilled = false;
if (sequence.present()) {
if (reqSequence.present()) {
auto& trackerData = self->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequenceNum + 1];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
trackerData.lastUpdate = now();
if (trackerData.sequence_version.size() && sequenceNum + 1 < trackerData.sequence_version.begin()->first) {
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
replyPromise.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
throw operation_obsolete();
return Void();
}
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get() != rep.end) {
TEST(true); // 0 tlog peek second attempt ended at a different version
throw operation_obsolete();
TEST(true); // tlog peek second attempt ended at a different version
replyPromise.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(rep.end);
}
rep.begin = begin;
rep.begin = reqBegin;
}
return rep;
replyPromise.send(rep);
return Void();
}
// grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
if (begin <= logData->persistentDataDurableVersion) {
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2);
if (reqBegin <= logData->persistentDataDurableVersion) {
// Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We
// may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if
// an initial attempt to read from disk results in insufficient data and the required data is no longer in
// memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the
// result?
peekMessagesFromMemory(logData, tag, begin, messages2, endVersion);
peekMessagesFromMemory(logData, reqTag, reqBegin, messages2, endVersion);
RangeResult kvs = wait(self->persistentData->readRange(
KeyRangeRef(persistTagMessagesKey(logData->logId, oldTag, begin),
KeyRangeRef(persistTagMessagesKey(logData->logId, oldTag, reqBegin),
persistTagMessagesKey(logData->logId, oldTag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->DESIRED_TOTAL_BYTES,
SERVER_KNOBS->DESIRED_TOTAL_BYTES));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
for (auto& kv : kvs) {
auto ver = decodeTagMessagesKey(kv.key);
@ -1079,7 +1093,7 @@ ACTOR Future<TLogPeekReply> peekTLog(TLogData* self,
uint32_t subVersion;
rd >> messageLength >> subVersion;
messageLength += sizeof(uint16_t) + sizeof(Tag);
messages << messageLength << subVersion << uint16_t(1) << tag;
messages << messageLength << subVersion << uint16_t(1) << reqTag;
messageLength -= (sizeof(subVersion) + sizeof(uint16_t) + sizeof(Tag));
messages.serializeBytes(rd.readBytes(messageLength), messageLength);
}
@ -1090,8 +1104,8 @@ ACTOR Future<TLogPeekReply> peekTLog(TLogData* self,
else
messages.serializeBytes(messages2.toValue());
} else {
peekMessagesFromMemory(logData, tag, begin, messages, endVersion);
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion);
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
}
TLogPeekReply reply;
@ -1101,40 +1115,48 @@ ACTOR Future<TLogPeekReply> peekTLog(TLogData* self,
reply.messages = StringRef(reply.arena, messages.toValue());
reply.end = endVersion;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress());
if (sequence.present()) {
if (reqSequence.present()) {
auto& trackerData = self->peekTracker[peekId];
trackerData.lastUpdate = now();
auto& sequenceData = trackerData.sequence_version[sequenceNum + 1];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get() != reply.end) {
TEST(true); // 0 tlog peek second attempt ended at a different version (2)
throw operation_obsolete();
TEST(true); // tlog peek second attempt ended at a different version (2)
replyPromise.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(reply.end);
}
reply.begin = begin;
reply.begin = reqBegin;
}
return reply;
replyPromise.send(reply);
return Void();
}
// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover
ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) {
self->activePeekStreams++;
TraceEvent(SevDebug, "TLogPeekStream", logData->logId).detail("Token", req.reply.getEndpoint().token);
state Version begin = req.begin;
state bool onlySpilled = false;
req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes));
loop {
state TLogPeekStreamReply reply;
state Promise<TLogPeekReply> promise;
state Future<TLogPeekReply> future(promise.getFuture());
try {
wait(req.reply.onReady() &&
store(reply.rep, peekTLog(self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled)));
tLogPeekMessages(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled));
ASSERT(future.isReady());
if (future.isError()) {
throw future.getError();
}
reply.rep = future.get();
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;
@ -1157,161 +1179,6 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
}
}
ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference<LogData> logData) {
state BinaryWriter messages(Unversioned());
state BinaryWriter messages2(Unversioned());
state int sequence = -1;
state UID peekId;
state OldTag oldTag = convertTag(req.tag);
if (req.sequence.present()) {
try {
peekId = req.sequence.get().first;
sequence = req.sequence.get().second;
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS &&
self->peekTracker.find(peekId) == self->peekTracker.end()) {
throw operation_obsolete();
}
if (sequence > 0) {
auto& trackerData = self->peekTracker[peekId];
trackerData.lastUpdate = now();
Version ver = wait(trackerData.sequence_version[sequence].getFuture());
req.begin = std::max(ver, req.begin);
wait(yield());
}
} catch (Error& e) {
if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else {
throw;
}
}
}
if (req.returnIfBlocked && logData->version.get() < req.begin) {
req.reply.sendError(end_of_stream());
return Void();
}
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
// Wait until we have something to return that the caller doesn't already have
if (logData->version.get() < req.begin) {
wait(logData->version.whenAtLeast(req.begin));
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
}
state Version endVersion = logData->version.get() + 1;
Version poppedVer = poppedVersion(logData, oldTag);
if (poppedVer > req.begin) {
TLogPeekReply rep;
rep.maxKnownVersion = logData->version.get();
rep.minKnownCommittedVersion = 0;
rep.popped = poppedVer;
rep.end = poppedVer;
rep.onlySpilled = false;
if (req.sequence.present()) {
auto& trackerData = self->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
trackerData.lastUpdate = now();
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
return Void();
}
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get() != rep.end) {
TEST(true); // tlog peek second attempt ended at a different version
req.reply.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(rep.end);
}
rep.begin = req.begin;
}
req.reply.send(rep);
return Void();
}
// grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
if (req.begin <= logData->persistentDataDurableVersion) {
// Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We
// may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if
// an initial attempt to read from disk results in insufficient data and the required data is no longer in
// memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the
// result?
peekMessagesFromMemory(logData, req.tag, req.begin, messages2, endVersion);
RangeResult kvs = wait(self->persistentData->readRange(
KeyRangeRef(persistTagMessagesKey(logData->logId, oldTag, req.begin),
persistTagMessagesKey(logData->logId, oldTag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->DESIRED_TOTAL_BYTES,
SERVER_KNOBS->DESIRED_TOTAL_BYTES));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
for (auto& kv : kvs) {
auto ver = decodeTagMessagesKey(kv.key);
messages << int32_t(-1) << ver;
BinaryReader rd(kv.value, Unversioned());
while (!rd.empty()) {
int32_t messageLength;
uint32_t subVersion;
rd >> messageLength >> subVersion;
messageLength += sizeof(uint16_t) + sizeof(Tag);
messages << messageLength << subVersion << uint16_t(1) << req.tag;
messageLength -= (sizeof(subVersion) + sizeof(uint16_t) + sizeof(Tag));
messages.serializeBytes(rd.readBytes(messageLength), messageLength);
}
}
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES)
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
else
messages.serializeBytes(messages2.toValue());
} else {
peekMessagesFromMemory(logData, req.tag, req.begin, messages, endVersion);
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
}
TLogPeekReply reply;
reply.maxKnownVersion = logData->version.get();
reply.minKnownCommittedVersion = 0;
reply.onlySpilled = false;
reply.messages = messages.toValue();
reply.end = endVersion;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());
if (req.sequence.present()) {
auto& trackerData = self->peekTracker[peekId];
trackerData.lastUpdate = now();
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get() != reply.end) {
TEST(true); // tlog peek second attempt ended at a different version (2)
req.reply.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(reply.end);
}
reply.begin = req.begin;
}
req.reply.send(reply);
return Void();
}
ACTOR Future<Void> doQueueCommit(TLogData* self, Reference<LogData> logData) {
state Version ver = logData->version.get();
state Version commitNumber = self->queueCommitBegin + 1;
@ -1476,7 +1343,8 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
PromiseStream<Void> warningCollectorInput) {
loop choose {
when(TLogPeekRequest req = waitNext(tli.peekMessages.getFuture())) {
logData->addActor.send(tLogPeekMessages(self, req, logData));
logData->addActor.send(tLogPeekMessages(
req.reply, self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence));
}
when(TLogPeekStreamRequest req = waitNext(tli.peekStreamMessages.getFuture())) {
TraceEvent(SevDebug, "TLogPeekStream", logData->logId)

View File

@ -1208,301 +1208,37 @@ void peekMessagesFromMemory(Reference<LogData> self,
}
// Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request
ACTOR Future<TLogPeekReply> peekTLog(TLogData* self,
ACTOR template <typename PromiseType>
Future<Void> tLogPeekMessages(PromiseType replyPromise,
TLogData* self,
Reference<LogData> logData,
Version begin,
Tag tag,
bool returnIfBlocked = false,
Version reqBegin,
Tag reqTag,
bool reqReturnIfBlocked = false,
bool reqOnlySpilled = false,
Optional<std::pair<UID, int>> sequence = Optional<std::pair<UID, int>>()) {
state BinaryWriter messages(Unversioned());
state BinaryWriter messages2(Unversioned());
state int sequenceNum = -1;
state UID peekId;
state double queueStart = now();
if (tag.locality == tagLocalityTxs && tag.id >= logData->txsTags && logData->txsTags > 0) {
tag.id = tag.id % logData->txsTags;
}
// TODO: once the fake stream is replaced by ReplyPromiseStream, we can remove the code handling sequence requests
// STEP: a. mark obsolete sequence requests; b. wait previous sequence requests are handled in order
if (sequence.present()) {
peekId = sequence.get().first;
sequenceNum = sequence.get().second;
if (sequenceNum >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS &&
logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
throw operation_obsolete();
}
auto& trackerData = logData->peekTracker[peekId];
if (sequenceNum == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.tag = tag;
trackerData.sequence_version[0].send(std::make_pair(begin, reqOnlySpilled));
}
auto seqBegin = trackerData.sequence_version.begin();
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
while (trackerData.sequence_version.size() &&
seqBegin->first <= sequenceNum - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
if (seqBegin->second.canBeSet()) {
seqBegin->second.sendError(operation_obsolete());
}
trackerData.sequence_version.erase(seqBegin);
seqBegin = trackerData.sequence_version.begin();
}
if (trackerData.sequence_version.size() && sequenceNum < seqBegin->first) {
throw operation_obsolete();
}
Future<std::pair<Version, bool>> fPrevPeekData = trackerData.sequence_version[sequenceNum].getFuture();
if (fPrevPeekData.isReady()) {
trackerData.unblockedPeeks++;
double t = now() - trackerData.lastUpdate;
if (t > trackerData.idleMax)
trackerData.idleMax = t;
trackerData.idleTime += t;
}
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(fPrevPeekData);
begin = std::max(prevPeekData.first, begin);
reqOnlySpilled = prevPeekData.second;
wait(yield());
}
state double blockStart = now();
if (returnIfBlocked && logData->version.get() < begin) {
if (sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequenceNum + 1];
trackerData.lastUpdate = now();
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(begin, reqOnlySpilled));
}
}
throw end_of_stream();
}
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
// Wait until we have something to return that the caller doesn't already have
if (logData->version.get() < begin) {
wait(logData->version.whenAtLeast(begin));
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
}
if (logData->locality != tagLocalitySatellite && tag.locality == tagLocalityLogRouter) {
wait(self->concurrentLogRouterReads.take());
state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads);
wait(delay(0.0, TaskPriority::Low));
}
if (begin <= logData->persistentDataDurableVersion && tag.locality != tagLocalityTxs && tag != txsTag) {
// Reading spilled data will almost always imply that the storage server is >5s behind the rest
// of the cluster. We shouldn't prioritize spending CPU on helping this server catch up
// slightly faster over keeping the rest of the cluster operating normally.
// txsTag is only ever peeked on recovery, and we would still wish to prioritize requests
// that impact recovery duration.
wait(delay(0, TaskPriority::TLogSpilledPeekReply));
}
state double workStart = now();
Version poppedVer = poppedVersion(logData, tag);
if (poppedVer > begin) {
// reply with an empty message and let the next reply start from poppedVer
TLogPeekReply rep;
rep.maxKnownVersion = logData->version.get();
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
rep.popped = poppedVer;
rep.end = poppedVer;
rep.onlySpilled = false;
// TODO: once the fake stream is replaced by ReplyPromiseStream, we can remove the code handling sequence
// requests.
if (sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequenceNum + 1];
trackerData.lastUpdate = now();
if (trackerData.sequence_version.size() && sequenceNum + 1 < trackerData.sequence_version.begin()->first) {
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
throw operation_obsolete();
}
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get().first != rep.end) {
TEST(true); // 1 tlog peek second attempt ended at a different version
throw operation_obsolete();
}
} else {
sequenceData.send(std::make_pair(rep.end, rep.onlySpilled));
}
rep.begin = begin;
}
return rep;
}
state Version endVersion = logData->version.get() + 1;
state bool onlySpilled = false;
// grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
if (begin <= logData->persistentDataDurableVersion) {
// Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We
// may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if
// an initial attempt to read from disk results in insufficient data and the required data is no longer in
// memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the
// result?
if (reqOnlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory(logData, tag, begin, messages2, endVersion);
}
RangeResult kvs = wait(self->persistentData->readRange(
KeyRangeRef(persistTagMessagesKey(logData->logId, tag, begin),
persistTagMessagesKey(logData->logId, tag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->DESIRED_TOTAL_BYTES,
SERVER_KNOBS->DESIRED_TOTAL_BYTES));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", reply.getEndpoint().address).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
for (auto& kv : kvs) {
auto ver = decodeTagMessagesKey(kv.key);
messages << VERSION_HEADER << ver;
messages.serializeBytes(kv.value);
}
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
onlySpilled = true;
} else {
messages.serializeBytes(messages2.toValue());
}
} else {
peekMessagesFromMemory(logData, tag, begin, messages, endVersion);
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", reply.getEndpoint().address).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
}
TLogPeekReply reply;
reply.maxKnownVersion = logData->version.get();
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
reply.messages = StringRef(reply.arena, messages.toValue());
reply.end = endVersion;
reply.onlySpilled = onlySpilled;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", reply.getEndpoint().address);
if (sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
double queueT = blockStart - queueStart;
double blockT = workStart - blockStart;
double workT = now() - workStart;
trackerData.totalPeeks++;
trackerData.replyBytes += reply.messages.size();
if (queueT > trackerData.queueMax)
trackerData.queueMax = queueT;
if (blockT > trackerData.blockMax)
trackerData.blockMax = blockT;
if (workT > trackerData.workMax)
trackerData.workMax = workT;
trackerData.queueTime += queueT;
trackerData.blockTime += blockT;
trackerData.workTime += workT;
auto& sequenceData = trackerData.sequence_version[sequenceNum + 1];
if (trackerData.sequence_version.size() && sequenceNum + 1 < trackerData.sequence_version.begin()->first) {
if (!sequenceData.isSet()) {
// It would technically be more correct to .send({req.begin, req.onlySpilled}), as the next
// request might still be in the window of active requests, but LogSystemPeekCursor will
// throw away all future responses upon getting an operation_obsolete(), so computing a
// response will probably be a waste of CPU.
sequenceData.sendError(operation_obsolete());
}
throw operation_obsolete();
}
if (sequenceData.isSet()) {
trackerData.duplicatePeeks++;
if (sequenceData.getFuture().get().first != reply.end) {
TEST(true); // 1 tlog peek second attempt ended at a different version (2)
throw operation_obsolete();
}
} else {
sequenceData.send(std::make_pair(reply.end, reply.onlySpilled));
}
reply.begin = begin;
}
return reply;
}
// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover
ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) {
self->activePeekStreams++;
state Version begin = req.begin;
state bool onlySpilled = false;
if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) {
req.tag.id = req.tag.id % logData->txsTags;
}
req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes));
loop {
state TLogPeekStreamReply reply;
try {
wait(req.reply.onReady() &&
store(reply.rep, peekTLog(self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled)));
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;
if (reply.rep.end > logData->version.get()) {
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
} else {
wait(delay(0, g_network->getCurrentTask()));
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else {
throw;
}
}
}
}
ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference<LogData> logData) {
Optional<std::pair<UID, int>> reqSequence = Optional<std::pair<UID, int>>()) {
state BinaryWriter messages(Unversioned());
state BinaryWriter messages2(Unversioned());
state int sequence = -1;
state UID peekId;
state double queueStart = now();
if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) {
req.tag.id = req.tag.id % logData->txsTags;
if (reqTag.locality == tagLocalityTxs && reqTag.id >= logData->txsTags && logData->txsTags > 0) {
reqTag.id = reqTag.id % logData->txsTags;
}
if (req.sequence.present()) {
if (reqSequence.present()) {
try {
peekId = req.sequence.get().first;
sequence = req.sequence.get().second;
peekId = reqSequence.get().first;
sequence = reqSequence.get().second;
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS &&
logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
throw operation_obsolete();
}
auto& trackerData = logData->peekTracker[peekId];
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.tag = req.tag;
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
trackerData.tag = reqTag;
trackerData.sequence_version[0].send(std::make_pair(reqBegin, reqOnlySpilled));
}
auto seqBegin = trackerData.sequence_version.begin();
while (trackerData.sequence_version.size() &&
@ -1529,12 +1265,12 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(fPrevPeekData);
req.begin = std::max(prevPeekData.first, req.begin);
req.onlySpilled = prevPeekData.second;
reqBegin = std::max(prevPeekData.first, reqBegin);
reqOnlySpilled = prevPeekData.second;
wait(yield());
} catch (Error& e) {
if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
replyPromise.sendError(e);
return Void();
} else {
throw;
@ -1544,32 +1280,32 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
state double blockStart = now();
if (req.returnIfBlocked && logData->version.get() < req.begin) {
req.reply.sendError(end_of_stream());
if (req.sequence.present()) {
if (reqReturnIfBlocked && logData->version.get() < reqBegin) {
replyPromise.sendError(end_of_stream());
if (reqSequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(req.begin, req.onlySpilled));
sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled));
}
}
return Void();
}
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2);
// Wait until we have something to return that the caller doesn't already have
if (logData->version.get() < req.begin) {
wait(logData->version.whenAtLeast(req.begin));
if (logData->version.get() < reqBegin) {
wait(logData->version.whenAtLeast(reqBegin));
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
}
if (logData->locality != tagLocalitySatellite && req.tag.locality == tagLocalityLogRouter) {
if (logData->locality != tagLocalitySatellite && reqTag.locality == tagLocalityLogRouter) {
wait(self->concurrentLogRouterReads.take());
state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads);
wait(delay(0.0, TaskPriority::Low));
}
if (req.begin <= logData->persistentDataDurableVersion && req.tag.locality != tagLocalityTxs && req.tag != txsTag) {
if (reqBegin <= logData->persistentDataDurableVersion && reqTag.locality != tagLocalityTxs && reqTag != txsTag) {
// Reading spilled data will almost always imply that the storage server is >5s behind the rest
// of the cluster. We shouldn't prioritize spending CPU on helping this server catch up
// slightly faster over keeping the rest of the cluster operating normally.
@ -1580,8 +1316,8 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
state double workStart = now();
Version poppedVer = poppedVersion(logData, req.tag);
if (poppedVer > req.begin) {
Version poppedVer = poppedVersion(logData, reqTag);
if (poppedVer > reqBegin) {
TLogPeekReply rep;
rep.maxKnownVersion = logData->version.get();
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
@ -1589,12 +1325,12 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
rep.end = poppedVer;
rep.onlySpilled = false;
if (req.sequence.present()) {
if (reqSequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
trackerData.lastUpdate = now();
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(operation_obsolete());
replyPromise.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
return Void();
@ -1602,16 +1338,16 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get().first != rep.end) {
TEST(true); // tlog peek second attempt ended at a different version
req.reply.sendError(operation_obsolete());
replyPromise.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(std::make_pair(rep.end, rep.onlySpilled));
}
rep.begin = req.begin;
rep.begin = reqBegin;
}
req.reply.send(rep);
replyPromise.send(rep);
return Void();
}
@ -1619,27 +1355,27 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
state bool onlySpilled = false;
// grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
if (req.begin <= logData->persistentDataDurableVersion) {
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2);
if (reqBegin <= logData->persistentDataDurableVersion) {
// Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We
// may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if
// an initial attempt to read from disk results in insufficient data and the required data is no longer in
// memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the
// result?
if (req.onlySpilled) {
if (reqOnlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory(logData, req.tag, req.begin, messages2, endVersion);
peekMessagesFromMemory(logData, reqTag, reqBegin, messages2, endVersion);
}
RangeResult kvs = wait(self->persistentData->readRange(
KeyRangeRef(persistTagMessagesKey(logData->logId, req.tag, req.begin),
persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)),
KeyRangeRef(persistTagMessagesKey(logData->logId, reqTag, reqBegin),
persistTagMessagesKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->DESIRED_TOTAL_BYTES,
SERVER_KNOBS->DESIRED_TOTAL_BYTES));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().address).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
for (auto& kv : kvs) {
auto ver = decodeTagMessagesKey(kv.key);
@ -1654,20 +1390,20 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
messages.serializeBytes(messages2.toValue());
}
} else {
peekMessagesFromMemory(logData, req.tag, req.begin, messages, endVersion);
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion);
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().address).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
}
TLogPeekReply reply;
reply.maxKnownVersion = logData->version.get();
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
reply.messages = messages.toValue();
reply.messages = StringRef(reply.arena, messages.toValue());
reply.end = endVersion;
reply.onlySpilled = onlySpilled;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address);
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", replyPromise.getEndpoint().address);
if (req.sequence.present()) {
if (reqSequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
@ -1691,7 +1427,7 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(operation_obsolete());
replyPromise.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
return Void();
@ -1700,19 +1436,60 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
trackerData.duplicatePeeks++;
if (sequenceData.getFuture().get().first != reply.end) {
TEST(true); // tlog peek second attempt ended at a different version (2)
req.reply.sendError(operation_obsolete());
replyPromise.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(std::make_pair(reply.end, reply.onlySpilled));
}
reply.begin = req.begin;
reply.begin = reqBegin;
}
req.reply.send(reply);
replyPromise.send(reply);
return Void();
}
// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover
ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) {
self->activePeekStreams++;
state Version begin = req.begin;
state bool onlySpilled = false;
req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes));
loop {
state TLogPeekStreamReply reply;
state Promise<TLogPeekReply> promise;
state Future<TLogPeekReply> future(promise.getFuture());
try {
wait(req.reply.onReady() &&
tLogPeekMessages(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled));
ASSERT(future.isReady());
if (future.isError()) {
throw future.getError();
}
reply.rep = future.get();
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;
if (reply.rep.end > logData->version.get()) {
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
} else {
wait(delay(0, g_network->getCurrentTask()));
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else {
throw;
}
}
}
}
ACTOR Future<Void> doQueueCommit(TLogData* self,
Reference<LogData> logData,
@ -2208,7 +1985,8 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
}
}
when(TLogPeekRequest req = waitNext(tli.peekMessages.getFuture())) {
logData->addActor.send(tLogPeekMessages(self, req, logData));
logData->addActor.send(tLogPeekMessages(
req.reply, self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence));
}
when(TLogPeekStreamRequest req = waitNext(tli.peekStreamMessages.getFuture())) {
TraceEvent(SevDebug, "TLogPeekStream", logData->logId)

View File

@ -1547,414 +1547,37 @@ ACTOR Future<std::vector<StringRef>> parseMessagesForTag(StringRef commitBlob, T
}
// Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request
ACTOR template<typename PromiseType>
Future<Void> peekTLogAndSend(PromiseType replyPromise, TLogData* self,
ACTOR template <typename PromiseType>
Future<Void> tLogPeekMessages(PromiseType replyPromise,
TLogData* self,
Reference<LogData> logData,
Version begin,
Tag tag,
bool returnIfBlocked = false,
Version reqBegin,
Tag reqTag,
bool reqReturnIfBlocked = false,
bool reqOnlySpilled = false,
Optional<std::pair<UID, int>> sequence = Optional<std::pair<UID, int>>()) {
state BinaryWriter messages(Unversioned());
state BinaryWriter messages2(Unversioned());
state int sequenceNum = -1;
state UID peekId;
state double queueStart = now();
if (tag.locality == tagLocalityTxs && tag.id >= logData->txsTags && logData->txsTags > 0) {
tag.id = tag.id % logData->txsTags;
}
// TODO: once the fake stream is replaced by ReplyPromiseStream, we can remove the code handling sequence requests
// STEP: a. mark obsolete sequence requests; b. wait previous sequence requests are handled in order
if (sequence.present()) {
try{
peekId = sequence.get().first;
sequenceNum = sequence.get().second;
if (sequenceNum >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS &&
logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
throw operation_obsolete();
}
auto& trackerData = logData->peekTracker[peekId];
if (sequenceNum == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.tag = tag;
trackerData.sequence_version[0].send(std::make_pair(begin, reqOnlySpilled));
}
auto seqBegin = trackerData.sequence_version.begin();
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
while (trackerData.sequence_version.size() &&
seqBegin->first <= sequenceNum - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
if (seqBegin->second.canBeSet()) {
seqBegin->second.sendError(operation_obsolete());
}
trackerData.sequence_version.erase(seqBegin);
seqBegin = trackerData.sequence_version.begin();
}
if (trackerData.sequence_version.size() && sequenceNum < seqBegin->first) {
throw operation_obsolete();
}
Future<std::pair<Version, bool>> fPrevPeekData = trackerData.sequence_version[sequenceNum].getFuture();
if (fPrevPeekData.isReady()) {
trackerData.unblockedPeeks++;
double t = now() - trackerData.lastUpdate;
if (t > trackerData.idleMax)
trackerData.idleMax = t;
trackerData.idleTime += t;
}
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(fPrevPeekData);
begin = std::max(prevPeekData.first, begin);
reqOnlySpilled = prevPeekData.second;
wait(yield());
} catch (Error& e) {
if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
replyPromise.sendError(e);
return Void();
} else {
throw;
}
}
}
state double blockStart = now();
if (returnIfBlocked && logData->version.get() < begin) {
replyPromise.sendError(end_of_stream());
if (sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequenceNum + 1];
trackerData.lastUpdate = now();
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(begin, reqOnlySpilled));
}
}
return Void();
}
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
// Wait until we have something to return that the caller doesn't already have
if (logData->version.get() < begin) {
wait(logData->version.whenAtLeast(begin));
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
}
if (logData->locality != tagLocalitySatellite && tag.locality == tagLocalityLogRouter) {
wait(self->concurrentLogRouterReads.take());
state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads);
wait(delay(0.0, TaskPriority::Low));
}
if (begin <= logData->persistentDataDurableVersion && tag.locality != tagLocalityTxs && tag != txsTag) {
// Reading spilled data will almost always imply that the storage server is >5s behind the rest
// of the cluster. We shouldn't prioritize spending CPU on helping this server catch up
// slightly faster over keeping the rest of the cluster operating normally.
// txsTag is only ever peeked on recovery, and we would still wish to prioritize requests
// that impact recovery duration.
wait(delay(0, TaskPriority::TLogSpilledPeekReply));
}
state double workStart = now();
Version poppedVer = poppedVersion(logData, tag);
if (poppedVer > begin) {
// reply with an empty message and let the next reply start from poppedVer
TLogPeekReply rep;
rep.maxKnownVersion = logData->version.get();
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
rep.popped = poppedVer;
rep.end = poppedVer;
rep.onlySpilled = false;
// TODO: once the fake stream is replaced by ReplyPromiseStream, we can remove the code handling sequence
// requests.
if (sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequenceNum + 1];
trackerData.lastUpdate = now();
if (trackerData.sequence_version.size() && sequenceNum + 1 < trackerData.sequence_version.begin()->first) {
replyPromise.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
return Void();
}
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get().first != rep.end) {
TEST(true); // 1 tlog peek second attempt ended at a different version
replyPromise.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(std::make_pair(rep.end, rep.onlySpilled));
}
rep.begin = begin;
}
replyPromise.send(rep);
return Void();
}
state Version endVersion = logData->version.get() + 1;
state bool onlySpilled = false;
// grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
if (begin <= logData->persistentDataDurableVersion) {
// Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We
// may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if
// an initial attempt to read from disk results in insufficient data and the required data is no longer in
// memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the
// result?
if (reqOnlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory(logData, tag, begin, messages2, endVersion);
}
if (tag.locality == tagLocalityTxs || tag == txsTag) {
RangeResult kvs = wait(self->persistentData->readRange(
KeyRangeRef(persistTagMessagesKey(logData->logId, tag, begin),
persistTagMessagesKey(logData->logId, tag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->DESIRED_TOTAL_BYTES,
SERVER_KNOBS->DESIRED_TOTAL_BYTES));
for (auto& kv : kvs) {
auto ver = decodeTagMessagesKey(kv.key);
messages << VERSION_HEADER << ver;
messages.serializeBytes(kv.value);
}
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
onlySpilled = true;
} else {
messages.serializeBytes(messages2.toValue());
}
} else {
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
RangeResult kvrefs = wait(self->persistentData->readRange(
KeyRangeRef(persistTagMessageRefsKey(logData->logId, tag, begin),
persistTagMessageRefsKey(logData->logId, tag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
state std::vector<std::pair<IDiskQueue::location, IDiskQueue::location>> commitLocations;
state bool earlyEnd = false;
uint32_t mutationBytes = 0;
state uint64_t commitBytes = 0;
state Version firstVersion = std::numeric_limits<Version>::max();
for (int i = 0; i < kvrefs.size() && i < SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK; i++) {
auto& kv = kvrefs[i];
VectorRef<SpilledData> spilledData;
BinaryReader r(kv.value, AssumeVersion(logData->protocolVersion));
r >> spilledData;
for (const SpilledData& sd : spilledData) {
if (mutationBytes >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
earlyEnd = true;
break;
}
if (sd.version >= begin) {
firstVersion = std::min(firstVersion, sd.version);
const IDiskQueue::location end = sd.start.lo + sd.length;
commitLocations.emplace_back(sd.start, end);
// This isn't perfect, because we aren't accounting for page boundaries, but should be
// close enough.
commitBytes += sd.length;
mutationBytes += sd.mutationBytes;
}
}
if (earlyEnd)
break;
}
earlyEnd = earlyEnd || (kvrefs.size() >= SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1);
wait(self->peekMemoryLimiter.take(TaskPriority::TLogSpilledPeekReply, commitBytes));
state FlowLock::Releaser memoryReservation(self->peekMemoryLimiter, commitBytes);
state std::vector<Future<Standalone<StringRef>>> messageReads;
messageReads.reserve(commitLocations.size());
for (const auto& pair : commitLocations) {
messageReads.push_back(self->rawPersistentQueue->read(pair.first, pair.second, CheckHashes::True));
}
commitLocations.clear();
wait(waitForAll(messageReads));
state Version lastRefMessageVersion = 0;
state int index = 0;
loop {
if (index >= messageReads.size())
break;
Standalone<StringRef> queueEntryData = messageReads[index].get();
uint8_t valid;
const uint32_t length = *(uint32_t*)queueEntryData.begin();
queueEntryData = queueEntryData.substr(4, queueEntryData.size() - 4);
BinaryReader rd(queueEntryData, IncludeVersion());
state TLogQueueEntry entry;
rd >> entry >> valid;
ASSERT(valid == 0x01);
ASSERT(length + sizeof(valid) == queueEntryData.size());
messages << VERSION_HEADER << entry.version;
std::vector<StringRef> rawMessages =
wait(parseMessagesForTag(entry.messages, tag, logData->logRouterTags));
for (const StringRef& msg : rawMessages) {
messages.serializeBytes(msg);
}
lastRefMessageVersion = entry.version;
index++;
}
messageReads.clear();
memoryReservation.release();
if (earlyEnd) {
endVersion = lastRefMessageVersion + 1;
onlySpilled = true;
} else {
messages.serializeBytes(messages2.toValue());
}
}
} else {
if (reqOnlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory(logData, tag, begin, messages, endVersion);
}
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
}
TLogPeekReply reply;
reply.maxKnownVersion = logData->version.get();
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
reply.messages = StringRef(reply.arena, messages.toValue());
reply.end = endVersion;
reply.onlySpilled = onlySpilled;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("Tag", req.tag.toString()).
// detail("BeginVer", req.begin).detail("EndVer", reply.end).
// detail("MsgBytes", reply.messages.expectedSize()).
// detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());
if (sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
double queueT = blockStart - queueStart;
double blockT = workStart - blockStart;
double workT = now() - workStart;
trackerData.totalPeeks++;
trackerData.replyBytes += reply.messages.size();
if (queueT > trackerData.queueMax)
trackerData.queueMax = queueT;
if (blockT > trackerData.blockMax)
trackerData.blockMax = blockT;
if (workT > trackerData.workMax)
trackerData.workMax = workT;
trackerData.queueTime += queueT;
trackerData.blockTime += blockT;
trackerData.workTime += workT;
auto& sequenceData = trackerData.sequence_version[sequenceNum + 1];
if (trackerData.sequence_version.size() && sequenceNum + 1 < trackerData.sequence_version.begin()->first) {
replyPromise.sendError(operation_obsolete());
if (!sequenceData.isSet()) {
// It would technically be more correct to .send({req.begin, req.onlySpilled}), as the next
// request might still be in the window of active requests, but LogSystemPeekCursor will
// throw away all future responses upon getting an operation_obsolete(), so computing a
// response will probably be a waste of CPU.
sequenceData.sendError(operation_obsolete());
}
return Void();
}
if (sequenceData.isSet()) {
trackerData.duplicatePeeks++;
if (sequenceData.getFuture().get().first != reply.end) {
TEST(true); // 1 tlog peek second attempt ended at a different version (2)
replyPromise.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(std::make_pair(reply.end, reply.onlySpilled));
}
reply.begin = begin;
}
replyPromise.send(reply);
return Void();
}
// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover
ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) {
self->activePeekStreams++;
state Version begin = req.begin;
state bool onlySpilled = false;
req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes));
loop {
state TLogPeekStreamReply reply;
state Promise<TLogPeekReply> promise;
state Future<TLogPeekReply> future(promise.getFuture());
try {
wait(req.reply.onReady() && peekTLogAndSend(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled));
ASSERT(future.isReady());
if(future.isError()) {
throw future.getError();
}
reply.rep = future.get();
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;
if (reply.rep.end > logData->version.get()) {
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
} else {
wait(delay(0, g_network->getCurrentTask()));
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else {
throw;
}
}
}
}
ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference<LogData> logData) {
Optional<std::pair<UID, int>> reqSequence = Optional<std::pair<UID, int>>()) {
state BinaryWriter messages(Unversioned());
state BinaryWriter messages2(Unversioned());
state int sequence = -1;
state UID peekId;
state double queueStart = now();
if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) {
req.tag.id = req.tag.id % logData->txsTags;
if (reqTag.locality == tagLocalityTxs && reqTag.id >= logData->txsTags && logData->txsTags > 0) {
reqTag.id = reqTag.id % logData->txsTags;
}
if (req.sequence.present()) {
if (reqSequence.present()) {
try {
peekId = req.sequence.get().first;
sequence = req.sequence.get().second;
peekId = reqSequence.get().first;
sequence = reqSequence.get().second;
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS &&
logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
throw operation_obsolete();
}
auto& trackerData = logData->peekTracker[peekId];
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.tag = req.tag;
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
trackerData.tag = reqTag;
trackerData.sequence_version[0].send(std::make_pair(reqBegin, reqOnlySpilled));
}
auto seqBegin = trackerData.sequence_version.begin();
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
@ -1981,12 +1604,12 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
}
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(fPrevPeekData);
req.begin = std::max(prevPeekData.first, req.begin);
req.onlySpilled = prevPeekData.second;
reqBegin = std::max(prevPeekData.first, reqBegin);
reqOnlySpilled = prevPeekData.second;
wait(yield());
} catch (Error& e) {
if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
replyPromise.sendError(e);
return Void();
} else {
throw;
@ -1996,32 +1619,32 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
state double blockStart = now();
if (req.returnIfBlocked && logData->version.get() < req.begin) {
req.reply.sendError(end_of_stream());
if (req.sequence.present()) {
if (reqReturnIfBlocked && logData->version.get() < reqBegin) {
replyPromise.sendError(end_of_stream());
if (reqSequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(req.begin, req.onlySpilled));
sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled));
}
}
return Void();
}
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2);
// Wait until we have something to return that the caller doesn't already have
if (logData->version.get() < req.begin) {
wait(logData->version.whenAtLeast(req.begin));
if (logData->version.get() < reqBegin) {
wait(logData->version.whenAtLeast(reqBegin));
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
}
if (req.tag.locality == tagLocalityLogRouter) {
if (reqTag.locality == tagLocalityLogRouter) {
wait(self->concurrentLogRouterReads.take());
state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads);
wait(delay(0.0, TaskPriority::Low));
}
if (req.begin <= logData->persistentDataDurableVersion && req.tag.locality != tagLocalityTxs && req.tag != txsTag) {
if (reqBegin <= logData->persistentDataDurableVersion && reqTag.locality != tagLocalityTxs && reqTag != txsTag) {
// Reading spilled data will almost always imply that the storage server is >5s behind the rest
// of the cluster. We shouldn't prioritize spending CPU on helping this server catch up
// slightly faster over keeping the rest of the cluster operating normally.
@ -2032,8 +1655,8 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
state double workStart = now();
Version poppedVer = poppedVersion(logData, req.tag);
if (poppedVer > req.begin) {
Version poppedVer = poppedVersion(logData, reqTag);
if (poppedVer > reqBegin) {
TLogPeekReply rep;
rep.maxKnownVersion = logData->version.get();
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
@ -2041,12 +1664,12 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
rep.end = poppedVer;
rep.onlySpilled = false;
if (req.sequence.present()) {
if (reqSequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
trackerData.lastUpdate = now();
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(operation_obsolete());
replyPromise.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
return Void();
@ -2054,16 +1677,16 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get().first != rep.end) {
TEST(true); // tlog peek second attempt ended at a different version
req.reply.sendError(operation_obsolete());
replyPromise.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(std::make_pair(rep.end, rep.onlySpilled));
}
rep.begin = req.begin;
rep.begin = reqBegin;
}
req.reply.send(rep);
replyPromise.send(rep);
return Void();
}
@ -2071,24 +1694,24 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
state bool onlySpilled = false;
// grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
if (req.begin <= logData->persistentDataDurableVersion) {
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2);
if (reqBegin <= logData->persistentDataDurableVersion) {
// Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We
// may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if
// an initial attempt to read from disk results in insufficient data and the required data is no longer in
// memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the
// result?
if (req.onlySpilled) {
if (reqOnlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory(logData, req.tag, req.begin, messages2, endVersion);
peekMessagesFromMemory(logData, reqTag, reqBegin, messages2, endVersion);
}
if (req.tag.locality == tagLocalityTxs || req.tag == txsTag) {
if (reqTag.locality == tagLocalityTxs || reqTag == txsTag) {
RangeResult kvs = wait(self->persistentData->readRange(
KeyRangeRef(persistTagMessagesKey(logData->logId, req.tag, req.begin),
persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)),
KeyRangeRef(persistTagMessagesKey(logData->logId, reqTag, reqBegin),
persistTagMessagesKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->DESIRED_TOTAL_BYTES,
SERVER_KNOBS->DESIRED_TOTAL_BYTES));
@ -2108,11 +1731,11 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
RangeResult kvrefs = wait(self->persistentData->readRange(
KeyRangeRef(
persistTagMessageRefsKey(logData->logId, req.tag, req.begin),
persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)),
persistTagMessageRefsKey(logData->logId, reqTag, reqBegin),
persistTagMessageRefsKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
state std::vector<std::pair<IDiskQueue::location, IDiskQueue::location>> commitLocations;
state bool earlyEnd = false;
@ -2129,7 +1752,7 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
earlyEnd = true;
break;
}
if (sd.version >= req.begin) {
if (sd.version >= reqBegin) {
firstVersion = std::min(firstVersion, sd.version);
const IDiskQueue::location end = sd.start.lo + sd.length;
commitLocations.emplace_back(sd.start, end);
@ -2171,7 +1794,7 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
messages << VERSION_HEADER << entry.version;
std::vector<StringRef> rawMessages =
wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags));
wait(parseMessagesForTag(entry.messages, reqTag, logData->logRouterTags));
for (const StringRef& msg : rawMessages) {
messages.serializeBytes(msg);
}
@ -2191,25 +1814,25 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
}
}
} else {
if (req.onlySpilled) {
if (reqOnlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory(logData, req.tag, req.begin, messages, endVersion);
peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion);
}
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
}
TLogPeekReply reply;
reply.maxKnownVersion = logData->version.get();
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
reply.messages = messages.toValue();
reply.messages = StringRef(reply.arena, messages.toValue());
reply.end = endVersion;
reply.onlySpilled = onlySpilled;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress());
if (req.sequence.present()) {
if (reqSequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
@ -2233,7 +1856,7 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(operation_obsolete());
replyPromise.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
return Void();
@ -2242,19 +1865,61 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
trackerData.duplicatePeeks++;
if (sequenceData.getFuture().get().first != reply.end) {
TEST(true); // tlog peek second attempt ended at a different version (2)
req.reply.sendError(operation_obsolete());
replyPromise.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(std::make_pair(reply.end, reply.onlySpilled));
}
reply.begin = req.begin;
reply.begin = reqBegin;
}
req.reply.send(reply);
replyPromise.send(reply);
return Void();
}
// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover
ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) {
self->activePeekStreams++;
state Version begin = req.begin;
state bool onlySpilled = false;
req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes));
loop {
state TLogPeekStreamReply reply;
state Promise<TLogPeekReply> promise;
state Future<TLogPeekReply> future(promise.getFuture());
try {
wait(req.reply.onReady() &&
tLogPeekMessages(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled));
ASSERT(future.isReady());
if (future.isError()) {
throw future.getError();
}
reply.rep = future.get();
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;
if (reply.rep.end > logData->version.get()) {
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
} else {
wait(delay(0, g_network->getCurrentTask()));
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else {
throw;
}
}
}
}
ACTOR Future<Void> watchDegraded(TLogData* self) {
if (g_network->isSimulated() && g_simulator.speedUpSimulation) {
return Void();
@ -2765,7 +2430,8 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
}
}
when(TLogPeekRequest req = waitNext(tli.peekMessages.getFuture())) {
logData->addActor.send(tLogPeekMessages(self, req, logData));
logData->addActor.send(tLogPeekMessages(
req.reply, self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence));
}
when(TLogPeekStreamRequest req = waitNext(tli.peekStreamMessages.getFuture())) {
TraceEvent(SevDebug, "TLogPeekStream", logData->logId)

View File

@ -1587,418 +1587,37 @@ ACTOR Future<std::vector<StringRef>> parseMessagesForTag(StringRef commitBlob, T
}
// Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request
ACTOR template<typename PromiseType>
Future<Void> peekTLogAndSend(PromiseType replyPromise, TLogData* self,
ACTOR template <typename PromiseType>
Future<Void> tLogPeekMessages(PromiseType replyPromise,
TLogData* self,
Reference<LogData> logData,
Version begin,
Tag tag,
bool returnIfBlocked = false,
Version reqBegin,
Tag reqTag,
bool reqReturnIfBlocked = false,
bool reqOnlySpilled = false,
Optional<std::pair<UID, int>> sequence = Optional<std::pair<UID, int>>()) {
state BinaryWriter messages(Unversioned());
state BinaryWriter messages2(Unversioned());
state int sequenceNum = -1;
state UID peekId;
state double queueStart = now();
if (tag.locality == tagLocalityTxs && tag.id >= logData->txsTags && logData->txsTags > 0) {
tag.id = tag.id % logData->txsTags;
}
// TODO: once the fake stream is replaced by ReplyPromiseStream, we can remove the code handling sequence requests
// STEP: a. mark obsolete sequence requests; b. wait previous sequence requests are handled in order
if (sequence.present()) {
try {
peekId = sequence.get().first;
sequenceNum = sequence.get().second;
if (sequenceNum >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS &&
logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
throw operation_obsolete();
}
auto& trackerData = logData->peekTracker[peekId];
if (sequenceNum == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.tag = tag;
trackerData.sequence_version[0].send(std::make_pair(begin, reqOnlySpilled));
}
auto seqBegin = trackerData.sequence_version.begin();
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
while (trackerData.sequence_version.size() &&
seqBegin->first <= sequenceNum - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
if (seqBegin->second.canBeSet()) {
seqBegin->second.sendError(operation_obsolete());
}
trackerData.sequence_version.erase(seqBegin);
seqBegin = trackerData.sequence_version.begin();
}
if (trackerData.sequence_version.size() && sequenceNum < seqBegin->first) {
throw operation_obsolete();
}
Future<std::pair<Version, bool>> fPrevPeekData = trackerData.sequence_version[sequenceNum].getFuture();
if (fPrevPeekData.isReady()) {
trackerData.unblockedPeeks++;
double t = now() - trackerData.lastUpdate;
if (t > trackerData.idleMax)
trackerData.idleMax = t;
trackerData.idleTime += t;
}
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(fPrevPeekData);
begin = std::max(prevPeekData.first, begin);
reqOnlySpilled = prevPeekData.second;
wait(yield());
} catch (Error& e) {
if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
replyPromise.sendError(e);
return Void();
} else {
throw;
}
}
}
state double blockStart = now();
if (returnIfBlocked && logData->version.get() < begin) {
replyPromise.sendError(end_of_stream());
if (sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequenceNum + 1];
trackerData.lastUpdate = now();
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(begin, reqOnlySpilled));
}
}
return Void();
}
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
// Wait until we have something to return that the caller doesn't already have
if (logData->version.get() < begin) {
wait(logData->version.whenAtLeast(begin));
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
}
if (logData->locality != tagLocalitySatellite && tag.locality == tagLocalityLogRouter) {
wait(self->concurrentLogRouterReads.take());
state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads);
wait(delay(0.0, TaskPriority::Low));
}
if (begin <= logData->persistentDataDurableVersion && tag.locality != tagLocalityTxs && tag != txsTag) {
// Reading spilled data will almost always imply that the storage server is >5s behind the rest
// of the cluster. We shouldn't prioritize spending CPU on helping this server catch up
// slightly faster over keeping the rest of the cluster operating normally.
// txsTag is only ever peeked on recovery, and we would still wish to prioritize requests
// that impact recovery duration.
wait(delay(0, TaskPriority::TLogSpilledPeekReply));
}
state double workStart = now();
Version poppedVer = poppedVersion(logData, tag);
if (poppedVer > begin) {
// reply with an empty message and let the next reply start from poppedVer
TLogPeekReply rep;
rep.maxKnownVersion = logData->version.get();
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
rep.popped = poppedVer;
rep.end = poppedVer;
rep.onlySpilled = false;
// TODO: once the fake stream is replaced by ReplyPromiseStream, we can remove the code handling sequence
// requests.
if (sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequenceNum + 1];
trackerData.lastUpdate = now();
if (trackerData.sequence_version.size() && sequenceNum + 1 < trackerData.sequence_version.begin()->first) {
replyPromise.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
return Void();
}
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get().first != rep.end) {
TEST(true); // xz tlog peek second attempt ended at a different version
replyPromise.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(std::make_pair(rep.end, rep.onlySpilled));
}
rep.begin = begin;
}
replyPromise.send(rep);
return Void();
}
state Version endVersion = logData->version.get() + 1;
state bool onlySpilled = false;
// grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
if (begin <= logData->persistentDataDurableVersion) {
// Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We
// may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if
// an initial attempt to read from disk results in insufficient data and the required data is no longer in
// memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the
// result?
if (reqOnlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory(logData, tag, begin, messages2, endVersion);
}
if (logData->shouldSpillByValue(tag)) {
RangeResult kvs = wait(self->persistentData->readRange(
KeyRangeRef(persistTagMessagesKey(logData->logId, tag, begin),
persistTagMessagesKey(logData->logId, tag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->DESIRED_TOTAL_BYTES,
SERVER_KNOBS->DESIRED_TOTAL_BYTES));
for (auto& kv : kvs) {
auto ver = decodeTagMessagesKey(kv.key);
messages << VERSION_HEADER << ver;
messages.serializeBytes(kv.value);
}
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
onlySpilled = true;
} else {
messages.serializeBytes(messages2.toValue());
}
} else {
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
RangeResult kvrefs = wait(self->persistentData->readRange(
KeyRangeRef(persistTagMessageRefsKey(logData->logId, tag, begin),
persistTagMessageRefsKey(logData->logId, tag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
state std::vector<std::pair<IDiskQueue::location, IDiskQueue::location>> commitLocations;
state bool earlyEnd = false;
uint32_t mutationBytes = 0;
state uint64_t commitBytes = 0;
state Version firstVersion = std::numeric_limits<Version>::max();
for (int i = 0; i < kvrefs.size() && i < SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK; i++) {
auto& kv = kvrefs[i];
VectorRef<SpilledData> spilledData;
BinaryReader r(kv.value, AssumeVersion(logData->protocolVersion));
r >> spilledData;
for (const SpilledData& sd : spilledData) {
if (mutationBytes >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
earlyEnd = true;
break;
}
if (sd.version >= begin) {
firstVersion = std::min(firstVersion, sd.version);
const IDiskQueue::location end = sd.start.lo + sd.length;
commitLocations.emplace_back(sd.start, end);
// This isn't perfect, because we aren't accounting for page boundaries, but should be
// close enough.
commitBytes += sd.length;
mutationBytes += sd.mutationBytes;
}
}
if (earlyEnd)
break;
}
earlyEnd = earlyEnd || (kvrefs.size() >= SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1);
wait(self->peekMemoryLimiter.take(TaskPriority::TLogSpilledPeekReply, commitBytes));
state FlowLock::Releaser memoryReservation(self->peekMemoryLimiter, commitBytes);
state std::vector<Future<Standalone<StringRef>>> messageReads;
messageReads.reserve(commitLocations.size());
for (const auto& pair : commitLocations) {
messageReads.push_back(self->rawPersistentQueue->read(pair.first, pair.second, CheckHashes::True));
}
commitLocations.clear();
wait(waitForAll(messageReads));
state Version lastRefMessageVersion = 0;
state int index = 0;
loop {
if (index >= messageReads.size())
break;
Standalone<StringRef> queueEntryData = messageReads[index].get();
uint8_t valid;
const uint32_t length = *(uint32_t*)queueEntryData.begin();
queueEntryData = queueEntryData.substr(4, queueEntryData.size() - 4);
BinaryReader rd(queueEntryData, IncludeVersion());
state TLogQueueEntry entry;
rd >> entry >> valid;
ASSERT(valid == 0x01);
ASSERT(length + sizeof(valid) == queueEntryData.size());
messages << VERSION_HEADER << entry.version;
std::vector<StringRef> rawMessages =
wait(parseMessagesForTag(entry.messages, tag, logData->logRouterTags));
for (const StringRef& msg : rawMessages) {
messages.serializeBytes(msg);
DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg)
.detail("UID", self->dbgid)
.detail("LogId", logData->logId)
.detail("PeekTag", tag);
}
lastRefMessageVersion = entry.version;
index++;
}
messageReads.clear();
memoryReservation.release();
if (earlyEnd) {
endVersion = lastRefMessageVersion + 1;
onlySpilled = true;
} else {
messages.serializeBytes(messages2.toValue());
}
}
} else {
if (reqOnlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory(logData, tag, begin, messages, endVersion);
}
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
}
TLogPeekReply reply;
reply.maxKnownVersion = logData->version.get();
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
reply.messages = StringRef(reply.arena, messages.toValue());
reply.end = endVersion;
reply.onlySpilled = onlySpilled;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("Tag", req.tag.toString()).
// detail("BeginVer", req.begin).detail("EndVer", reply.end).
// detail("MsgBytes", reply.messages.expectedSize()).
// detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());
if (sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
double queueT = blockStart - queueStart;
double blockT = workStart - blockStart;
double workT = now() - workStart;
trackerData.totalPeeks++;
trackerData.replyBytes += reply.messages.size();
if (queueT > trackerData.queueMax)
trackerData.queueMax = queueT;
if (blockT > trackerData.blockMax)
trackerData.blockMax = blockT;
if (workT > trackerData.workMax)
trackerData.workMax = workT;
trackerData.queueTime += queueT;
trackerData.blockTime += blockT;
trackerData.workTime += workT;
auto& sequenceData = trackerData.sequence_version[sequenceNum + 1];
if (trackerData.sequence_version.size() && sequenceNum + 1 < trackerData.sequence_version.begin()->first) {
replyPromise.sendError(operation_obsolete());
if (!sequenceData.isSet()) {
// It would technically be more correct to .send({req.begin, req.onlySpilled}), as the next
// request might still be in the window of active requests, but LogSystemPeekCursor will
// throw away all future responses upon getting an operation_obsolete(), so computing a
// response will probably be a waste of CPU.
sequenceData.sendError(operation_obsolete());
}
return Void();
}
if (sequenceData.isSet()) {
trackerData.duplicatePeeks++;
if (sequenceData.getFuture().get().first != reply.end) {
TEST(true); // xz tlog peek second attempt ended at a different version (2)
replyPromise.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(std::make_pair(reply.end, reply.onlySpilled));
}
reply.begin = begin;
}
replyPromise.send(reply);
return Void();
}
// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover
ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) {
self->activePeekStreams++;
state Version begin = req.begin;
state bool onlySpilled = false;
req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes));
loop {
state TLogPeekStreamReply reply;
state Promise<TLogPeekReply> promise;
state Future<TLogPeekReply> future(promise.getFuture());
try {
wait(req.reply.onReady() && peekTLogAndSend(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled));
ASSERT(future.isReady());
if(future.isError()) {
throw future.getError();
}
reply.rep = future.get();
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;
if (reply.rep.end > logData->version.get()) {
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
} else {
wait(delay(0, g_network->getCurrentTask()));
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else {
throw;
}
}
}
}
ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference<LogData> logData) {
Optional<std::pair<UID, int>> reqSequence = Optional<std::pair<UID, int>>()) {
state BinaryWriter messages(Unversioned());
state BinaryWriter messages2(Unversioned());
state int sequence = -1;
state UID peekId;
state double queueStart = now();
if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) {
req.tag.id = req.tag.id % logData->txsTags;
if (reqTag.locality == tagLocalityTxs && reqTag.id >= logData->txsTags && logData->txsTags > 0) {
reqTag.id = reqTag.id % logData->txsTags;
}
if (req.sequence.present()) {
if (reqSequence.present()) {
try {
peekId = req.sequence.get().first;
sequence = req.sequence.get().second;
peekId = reqSequence.get().first;
sequence = reqSequence.get().second;
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS &&
logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
throw operation_obsolete();
}
auto& trackerData = logData->peekTracker[peekId];
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.tag = req.tag;
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
trackerData.tag = reqTag;
trackerData.sequence_version[0].send(std::make_pair(reqBegin, reqOnlySpilled));
}
auto seqBegin = trackerData.sequence_version.begin();
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
@ -2025,12 +1644,12 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
}
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(fPrevPeekData);
req.begin = std::max(prevPeekData.first, req.begin);
req.onlySpilled = prevPeekData.second;
reqBegin = std::max(prevPeekData.first, reqBegin);
reqOnlySpilled = prevPeekData.second;
wait(yield());
} catch (Error& e) {
if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
replyPromise.sendError(e);
return Void();
} else {
throw;
@ -2040,33 +1659,33 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
state double blockStart = now();
if (req.returnIfBlocked && logData->version.get() < req.begin) {
req.reply.sendError(end_of_stream());
if (req.sequence.present()) {
if (reqReturnIfBlocked && logData->version.get() < reqBegin) {
replyPromise.sendError(end_of_stream());
if (reqSequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
trackerData.lastUpdate = now();
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(req.begin, req.onlySpilled));
sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled));
}
}
return Void();
}
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2);
// Wait until we have something to return that the caller doesn't already have
if (logData->version.get() < req.begin) {
wait(logData->version.whenAtLeast(req.begin));
if (logData->version.get() < reqBegin) {
wait(logData->version.whenAtLeast(reqBegin));
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
}
if (logData->locality != tagLocalitySatellite && req.tag.locality == tagLocalityLogRouter) {
if (logData->locality != tagLocalitySatellite && reqTag.locality == tagLocalityLogRouter) {
wait(self->concurrentLogRouterReads.take());
state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads);
wait(delay(0.0, TaskPriority::Low));
}
if (req.begin <= logData->persistentDataDurableVersion && req.tag.locality != tagLocalityTxs && req.tag != txsTag) {
if (reqBegin <= logData->persistentDataDurableVersion && reqTag.locality != tagLocalityTxs && reqTag != txsTag) {
// Reading spilled data will almost always imply that the storage server is >5s behind the rest
// of the cluster. We shouldn't prioritize spending CPU on helping this server catch up
// slightly faster over keeping the rest of the cluster operating normally.
@ -2077,8 +1696,8 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
state double workStart = now();
Version poppedVer = poppedVersion(logData, req.tag);
if (poppedVer > req.begin) {
Version poppedVer = poppedVersion(logData, reqTag);
if (poppedVer > reqBegin) {
TLogPeekReply rep;
rep.maxKnownVersion = logData->version.get();
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
@ -2086,12 +1705,12 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
rep.end = poppedVer;
rep.onlySpilled = false;
if (req.sequence.present()) {
if (reqSequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
trackerData.lastUpdate = now();
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(operation_obsolete());
replyPromise.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
return Void();
@ -2099,16 +1718,16 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get().first != rep.end) {
TEST(true); // tlog peek second attempt ended at a different version
req.reply.sendError(operation_obsolete());
replyPromise.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(std::make_pair(rep.end, rep.onlySpilled));
}
rep.begin = req.begin;
rep.begin = reqBegin;
}
req.reply.send(rep);
replyPromise.send(rep);
return Void();
}
@ -2116,24 +1735,24 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
state bool onlySpilled = false;
// grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
if (req.begin <= logData->persistentDataDurableVersion) {
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2);
if (reqBegin <= logData->persistentDataDurableVersion) {
// Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We
// may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if
// an initial attempt to read from disk results in insufficient data and the required data is no longer in
// memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the
// result?
if (req.onlySpilled) {
if (reqOnlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory(logData, req.tag, req.begin, messages2, endVersion);
peekMessagesFromMemory(logData, reqTag, reqBegin, messages2, endVersion);
}
if (logData->shouldSpillByValue(req.tag)) {
if (logData->shouldSpillByValue(reqTag)) {
RangeResult kvs = wait(self->persistentData->readRange(
KeyRangeRef(persistTagMessagesKey(logData->logId, req.tag, req.begin),
persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)),
KeyRangeRef(persistTagMessagesKey(logData->logId, reqTag, reqBegin),
persistTagMessagesKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->DESIRED_TOTAL_BYTES,
SERVER_KNOBS->DESIRED_TOTAL_BYTES));
@ -2153,11 +1772,11 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
RangeResult kvrefs = wait(self->persistentData->readRange(
KeyRangeRef(
persistTagMessageRefsKey(logData->logId, req.tag, req.begin),
persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)),
persistTagMessageRefsKey(logData->logId, reqTag, reqBegin),
persistTagMessageRefsKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
state std::vector<std::pair<IDiskQueue::location, IDiskQueue::location>> commitLocations;
state bool earlyEnd = false;
@ -2174,7 +1793,7 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
earlyEnd = true;
break;
}
if (sd.version >= req.begin) {
if (sd.version >= reqBegin) {
firstVersion = std::min(firstVersion, sd.version);
const IDiskQueue::location end = sd.start.lo + sd.length;
commitLocations.emplace_back(sd.start, end);
@ -2216,13 +1835,13 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
messages << VERSION_HEADER << entry.version;
std::vector<StringRef> rawMessages =
wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags));
wait(parseMessagesForTag(entry.messages, reqTag, logData->logRouterTags));
for (const StringRef& msg : rawMessages) {
messages.serializeBytes(msg);
DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg)
.detail("UID", self->dbgid)
.detail("LogId", logData->logId)
.detail("PeekTag", req.tag);
.detail("PeekTag", reqTag);
}
lastRefMessageVersion = entry.version;
@ -2240,28 +1859,28 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
}
}
} else {
if (req.onlySpilled) {
if (reqOnlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory(logData, req.tag, req.begin, messages, endVersion);
peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion);
}
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
}
TLogPeekReply reply;
reply.maxKnownVersion = logData->version.get();
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
reply.messages = messages.toValue();
reply.messages = StringRef(reply.arena, messages.toValue());
reply.end = endVersion;
reply.onlySpilled = onlySpilled;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("Tag", req.tag.toString()).
// detail("BeginVer", req.begin).detail("EndVer", reply.end).
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("Tag", reqTag.toString()).
// detail("BeginVer", reqBegin).detail("EndVer", reply.end).
// detail("MsgBytes", reply.messages.expectedSize()).
// detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());
// detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress());
if (req.sequence.present()) {
if (reqSequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
@ -2285,9 +1904,9 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(operation_obsolete());
replyPromise.sendError(operation_obsolete());
if (!sequenceData.isSet()) {
// It would technically be more correct to .send({req.begin, req.onlySpilled}), as the next
// It would technically be more correct to .send({reqBegin, reqOnlySpilled}), as the next
// request might still be in the window of active requests, but LogSystemPeekCursor will
// throw away all future responses upon getting an operation_obsolete(), so computing a
// response will probably be a waste of CPU.
@ -2299,19 +1918,60 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
trackerData.duplicatePeeks++;
if (sequenceData.getFuture().get().first != reply.end) {
TEST(true); // tlog peek second attempt ended at a different version (2)
req.reply.sendError(operation_obsolete());
replyPromise.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(std::make_pair(reply.end, reply.onlySpilled));
}
reply.begin = req.begin;
reply.begin = reqBegin;
}
req.reply.send(reply);
replyPromise.send(reply);
return Void();
}
// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover
ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) {
self->activePeekStreams++;
state Version begin = req.begin;
state bool onlySpilled = false;
req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes));
loop {
state TLogPeekStreamReply reply;
state Promise<TLogPeekReply> promise;
state Future<TLogPeekReply> future(promise.getFuture());
try {
wait(req.reply.onReady() &&
tLogPeekMessages(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled));
ASSERT(future.isReady());
if (future.isError()) {
throw future.getError();
}
reply.rep = future.get();
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;
if (reply.rep.end > logData->version.get()) {
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
} else {
wait(delay(0, g_network->getCurrentTask()));
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else {
throw;
}
}
}
}
ACTOR Future<Void> doQueueCommit(TLogData* self,
Reference<LogData> logData,
@ -2814,7 +2474,8 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
logData->addActor.send(tLogPeekStream(self, req, logData));
}
when(TLogPeekRequest req = waitNext(tli.peekMessages.getFuture())) {
logData->addActor.send(tLogPeekMessages(self, req, logData));
logData->addActor.send(tLogPeekMessages(
req.reply, self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence));
}
when(TLogPopRequest req = waitNext(tli.popMessages.getFuture())) {
logData->addActor.send(tLogPop(self, req, logData));

View File

@ -184,7 +184,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( FAST_NETWORK_LATENCY, 800e-6 );
init( SLOW_NETWORK_LATENCY, 100e-3 );
init( MAX_CLOGGING_LATENCY, 0 ); if( randomize && BUGGIFY ) MAX_CLOGGING_LATENCY = 0.1 * deterministicRandom()->random01();
init( MAX_BUGGIFIED_DELAY, 0 ); // if( randomize && BUGGIFY ) MAX_BUGGIFIED_DELAY = 0.2 * deterministicRandom()->random01();
init( MAX_BUGGIFIED_DELAY, 0 ); if( randomize && BUGGIFY ) MAX_BUGGIFIED_DELAY = 0.2 * deterministicRandom()->random01();
init( SIM_CONNECT_ERROR_MODE, deterministicRandom()->randomInt(0,3) );
//Tracefiles