diff --git a/fdbrpc/fdbrpc.h b/fdbrpc/fdbrpc.h index c0f39aa0b4..60b4c0168e 100644 --- a/fdbrpc/fdbrpc.h +++ b/fdbrpc/fdbrpc.h @@ -328,16 +328,14 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue, NetNotifiedQueueWithAcknowledgements(int futures, int promises, const Endpoint& remoteEndpoint) : NotifiedQueue(futures, promises), FlowReceiver(remoteEndpoint, true) { // A ReplyPromiseStream will be terminated on the server side if the network connection with the client breaks - acknowledgements.failures = - tagError(makeDependent(IFailureMonitor::failureMonitor()).onDisconnectOrFailure(remoteEndpoint), - operation_obsolete()); + acknowledgements.failures = tagError( + makeDependent(IFailureMonitor::failureMonitor()).onDisconnect(remoteEndpoint.getPrimaryAddress()), + operation_obsolete()); } void destroy() override { delete this; } void receive(ArenaObjectReader& reader) override { this->addPromiseRef(); - // TraceEvent(SevDebug, "NetNotifiedQueueWithAcknowledgementsReceive") - // .detail("PromiseRef", this->getPromiseReferenceCount()); ErrorOr> message; reader.deserialize(message); @@ -371,8 +369,6 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue, this->send(std::move(message.get().asUnderlyingType())); } this->delPromiseRef(); - // TraceEvent(SevDebug, "NetNotifiedQueueWithAcknowledgementsReceiveEnd") - // .detail("PromiseRef", this->getPromiseReferenceCount()); } T pop() override { @@ -698,18 +694,20 @@ public: template ReplyPromiseStream getReplyStream(const X& value) const { - auto p = getReplyPromiseStream(value); if (queue->isRemoteEndpoint()) { Future disc = makeDependent(IFailureMonitor::failureMonitor()).onDisconnectOrFailure(getEndpoint()); + auto& p = getReplyPromiseStream(value); Reference peer = FlowTransport::transport().sendUnreliable(SerializeSource(value), getEndpoint(), true); // FIXME: defer sending the message until we know the connection is established endStreamOnDisconnect(disc, p, getEndpoint(), peer); + return p; } else { send(value); + auto& p = getReplyPromiseStream(value); + return p; } - return p; } // stream.getReplyUnlessFailedFor( request, double sustainedFailureDuration, double sustainedFailureSlope ) diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 55706e458f..dad17a1a67 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -815,8 +815,7 @@ ACTOR static Future finishMoveKeys(Database occ, // Wait for a durable quorum of servers in destServers to have keys available (readWrite) // They must also have at least the transaction read version so they can't "forget" the shard - // between - // now and when this transaction commits. + // between now and when this transaction commits. state vector> serverReady; // only for count below state vector> tssReady; // for waiting in parallel with tss state vector tssReadyInterfs; diff --git a/fdbserver/OldTLogServer_4_6.actor.cpp b/fdbserver/OldTLogServer_4_6.actor.cpp index d5fafab6c7..35d142b9f7 100644 --- a/fdbserver/OldTLogServer_4_6.actor.cpp +++ b/fdbserver/OldTLogServer_4_6.actor.cpp @@ -975,51 +975,62 @@ void peekMessagesFromMemory(Reference self, } // Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request -ACTOR Future peekTLog(TLogData* self, - Reference logData, - Version begin, - Tag tag, - bool returnIfBlocked = false, - bool reqOnlySpilled = false, - Optional> sequence = Optional>()) { +ACTOR template +Future tLogPeekMessages(PromiseType replyPromise, + TLogData* self, + Reference logData, + Version reqBegin, + Tag reqTag, + bool reqReturnIfBlocked = false, + bool reqOnlySpilled = false, + Optional> reqSequence = Optional>()) { state BinaryWriter messages(Unversioned()); state BinaryWriter messages2(Unversioned()); - state int sequenceNum = -1; + state int sequence = -1; state UID peekId; - state double queueStart = now(); - state OldTag oldTag = convertTag(tag); + state OldTag oldTag = convertTag(reqTag); - if (sequence.present()) { - peekId = sequence.get().first; - sequenceNum = sequence.get().second; - if (sequenceNum >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && - self->peekTracker.find(peekId) == self->peekTracker.end()) { - throw operation_obsolete(); - } - if (sequenceNum > 0) { - auto& trackerData = self->peekTracker[peekId]; - trackerData.lastUpdate = now(); - Version ver = wait(trackerData.sequence_version[sequenceNum].getFuture()); - begin = std::max(ver, begin); - wait(yield()); + if (reqSequence.present()) { + try { + peekId = reqSequence.get().first; + sequence = reqSequence.get().second; + if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && + self->peekTracker.find(peekId) == self->peekTracker.end()) { + throw operation_obsolete(); + } + if (sequence > 0) { + auto& trackerData = self->peekTracker[peekId]; + trackerData.lastUpdate = now(); + Version ver = wait(trackerData.sequence_version[sequence].getFuture()); + reqBegin = std::max(ver, reqBegin); + wait(yield()); + } + } catch (Error& e) { + if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) { + replyPromise.sendError(e); + return Void(); + } else { + throw; + } } } - if (returnIfBlocked && logData->version.get() < begin) { - throw end_of_stream(); + if (reqReturnIfBlocked && logData->version.get() < reqBegin) { + replyPromise.sendError(end_of_stream()); + return Void(); } - //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); + //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); // Wait until we have something to return that the caller doesn't already have - if (logData->version.get() < begin) { - wait(logData->version.whenAtLeast(begin)); + if (logData->version.get() < reqBegin) { + wait(logData->version.whenAtLeast(reqBegin)); wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); } state Version endVersion = logData->version.get() + 1; Version poppedVer = poppedVersion(logData, oldTag); - if (poppedVer > begin) { + if (poppedVer > reqBegin) { TLogPeekReply rep; rep.maxKnownVersion = logData->version.get(); rep.minKnownCommittedVersion = 0; @@ -1027,47 +1038,50 @@ ACTOR Future peekTLog(TLogData* self, rep.end = poppedVer; rep.onlySpilled = false; - if (sequence.present()) { + if (reqSequence.present()) { auto& trackerData = self->peekTracker[peekId]; - auto& sequenceData = trackerData.sequence_version[sequenceNum + 1]; + auto& sequenceData = trackerData.sequence_version[sequence + 1]; trackerData.lastUpdate = now(); - if (trackerData.sequence_version.size() && sequenceNum + 1 < trackerData.sequence_version.begin()->first) { + if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { + replyPromise.sendError(operation_obsolete()); if (!sequenceData.isSet()) sequenceData.sendError(operation_obsolete()); - throw operation_obsolete(); + return Void(); } if (sequenceData.isSet()) { if (sequenceData.getFuture().get() != rep.end) { - TEST(true); // 0 tlog peek second attempt ended at a different version - throw operation_obsolete(); + TEST(true); // tlog peek second attempt ended at a different version + replyPromise.sendError(operation_obsolete()); + return Void(); } } else { sequenceData.send(rep.end); } - rep.begin = begin; + rep.begin = reqBegin; } - return rep; + replyPromise.send(rep); + return Void(); } // grab messages from disk - //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); - if (begin <= logData->persistentDataDurableVersion) { + //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); + if (reqBegin <= logData->persistentDataDurableVersion) { // Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We // may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if // an initial attempt to read from disk results in insufficient data and the required data is no longer in // memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the // result? - peekMessagesFromMemory(logData, tag, begin, messages2, endVersion); + peekMessagesFromMemory(logData, reqTag, reqBegin, messages2, endVersion); RangeResult kvs = wait(self->persistentData->readRange( - KeyRangeRef(persistTagMessagesKey(logData->logId, oldTag, begin), - persistTagMessagesKey(logData->logId, oldTag, logData->persistentDataDurableVersion + 1)), - SERVER_KNOBS->DESIRED_TOTAL_BYTES, - SERVER_KNOBS->DESIRED_TOTAL_BYTES)); + KeyRangeRef(persistTagMessagesKey(logData->logId, oldTag, reqBegin), + persistTagMessagesKey(logData->logId, oldTag, logData->persistentDataDurableVersion + 1)), + SERVER_KNOBS->DESIRED_TOTAL_BYTES, + SERVER_KNOBS->DESIRED_TOTAL_BYTES)); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); for (auto& kv : kvs) { auto ver = decodeTagMessagesKey(kv.key); @@ -1079,7 +1093,7 @@ ACTOR Future peekTLog(TLogData* self, uint32_t subVersion; rd >> messageLength >> subVersion; messageLength += sizeof(uint16_t) + sizeof(Tag); - messages << messageLength << subVersion << uint16_t(1) << tag; + messages << messageLength << subVersion << uint16_t(1) << reqTag; messageLength -= (sizeof(subVersion) + sizeof(uint16_t) + sizeof(Tag)); messages.serializeBytes(rd.readBytes(messageLength), messageLength); } @@ -1090,8 +1104,8 @@ ACTOR Future peekTLog(TLogData* self, else messages.serializeBytes(messages2.toValue()); } else { - peekMessagesFromMemory(logData, tag, begin, messages, endVersion); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); + peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); } TLogPeekReply reply; @@ -1101,40 +1115,48 @@ ACTOR Future peekTLog(TLogData* self, reply.messages = StringRef(reply.arena, messages.toValue()); reply.end = endVersion; - //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()); + //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()); - if (sequence.present()) { + if (reqSequence.present()) { auto& trackerData = self->peekTracker[peekId]; trackerData.lastUpdate = now(); - auto& sequenceData = trackerData.sequence_version[sequenceNum + 1]; + auto& sequenceData = trackerData.sequence_version[sequence + 1]; if (sequenceData.isSet()) { if (sequenceData.getFuture().get() != reply.end) { - TEST(true); // 0 tlog peek second attempt ended at a different version (2) - throw operation_obsolete(); + TEST(true); // tlog peek second attempt ended at a different version (2) + replyPromise.sendError(operation_obsolete()); + return Void(); } } else { sequenceData.send(reply.end); } - reply.begin = begin; + reply.begin = reqBegin; } - return reply; + replyPromise.send(reply); + return Void(); } // This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference logData) { self->activePeekStreams++; - TraceEvent(SevDebug, "TLogPeekStream", logData->logId).detail("Token", req.reply.getEndpoint().token); state Version begin = req.begin; state bool onlySpilled = false; - req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes)); loop { state TLogPeekStreamReply reply; + state Promise promise; + state Future future(promise.getFuture()); try { wait(req.reply.onReady() && - store(reply.rep, peekTLog(self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled))); + tLogPeekMessages(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled)); + ASSERT(future.isReady()); + if (future.isError()) { + throw future.getError(); + } + + reply.rep = future.get(); req.reply.send(reply); begin = reply.rep.end; onlySpilled = reply.rep.onlySpilled; @@ -1157,161 +1179,6 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref } } - -ACTOR Future tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference logData) { - state BinaryWriter messages(Unversioned()); - state BinaryWriter messages2(Unversioned()); - state int sequence = -1; - state UID peekId; - state OldTag oldTag = convertTag(req.tag); - - if (req.sequence.present()) { - try { - peekId = req.sequence.get().first; - sequence = req.sequence.get().second; - if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && - self->peekTracker.find(peekId) == self->peekTracker.end()) { - throw operation_obsolete(); - } - if (sequence > 0) { - auto& trackerData = self->peekTracker[peekId]; - trackerData.lastUpdate = now(); - Version ver = wait(trackerData.sequence_version[sequence].getFuture()); - req.begin = std::max(ver, req.begin); - wait(yield()); - } - } catch (Error& e) { - if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) { - req.reply.sendError(e); - return Void(); - } else { - throw; - } - } - } - - if (req.returnIfBlocked && logData->version.get() < req.begin) { - req.reply.sendError(end_of_stream()); - return Void(); - } - - //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); - // Wait until we have something to return that the caller doesn't already have - if (logData->version.get() < req.begin) { - wait(logData->version.whenAtLeast(req.begin)); - wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); - } - - state Version endVersion = logData->version.get() + 1; - - Version poppedVer = poppedVersion(logData, oldTag); - if (poppedVer > req.begin) { - TLogPeekReply rep; - rep.maxKnownVersion = logData->version.get(); - rep.minKnownCommittedVersion = 0; - rep.popped = poppedVer; - rep.end = poppedVer; - rep.onlySpilled = false; - - if (req.sequence.present()) { - auto& trackerData = self->peekTracker[peekId]; - auto& sequenceData = trackerData.sequence_version[sequence + 1]; - trackerData.lastUpdate = now(); - if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { - req.reply.sendError(operation_obsolete()); - if (!sequenceData.isSet()) - sequenceData.sendError(operation_obsolete()); - return Void(); - } - if (sequenceData.isSet()) { - if (sequenceData.getFuture().get() != rep.end) { - TEST(true); // tlog peek second attempt ended at a different version - req.reply.sendError(operation_obsolete()); - return Void(); - } - } else { - sequenceData.send(rep.end); - } - rep.begin = req.begin; - } - - req.reply.send(rep); - return Void(); - } - - // grab messages from disk - //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); - if (req.begin <= logData->persistentDataDurableVersion) { - // Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We - // may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if - // an initial attempt to read from disk results in insufficient data and the required data is no longer in - // memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the - // result? - - peekMessagesFromMemory(logData, req.tag, req.begin, messages2, endVersion); - - RangeResult kvs = wait(self->persistentData->readRange( - KeyRangeRef(persistTagMessagesKey(logData->logId, oldTag, req.begin), - persistTagMessagesKey(logData->logId, oldTag, logData->persistentDataDurableVersion + 1)), - SERVER_KNOBS->DESIRED_TOTAL_BYTES, - SERVER_KNOBS->DESIRED_TOTAL_BYTES)); - - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); - - for (auto& kv : kvs) { - auto ver = decodeTagMessagesKey(kv.key); - messages << int32_t(-1) << ver; - - BinaryReader rd(kv.value, Unversioned()); - while (!rd.empty()) { - int32_t messageLength; - uint32_t subVersion; - rd >> messageLength >> subVersion; - messageLength += sizeof(uint16_t) + sizeof(Tag); - messages << messageLength << subVersion << uint16_t(1) << req.tag; - messageLength -= (sizeof(subVersion) + sizeof(uint16_t) + sizeof(Tag)); - messages.serializeBytes(rd.readBytes(messageLength), messageLength); - } - } - - if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) - endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1; - else - messages.serializeBytes(messages2.toValue()); - } else { - peekMessagesFromMemory(logData, req.tag, req.begin, messages, endVersion); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); - } - - TLogPeekReply reply; - reply.maxKnownVersion = logData->version.get(); - reply.minKnownCommittedVersion = 0; - reply.onlySpilled = false; - reply.messages = messages.toValue(); - reply.end = endVersion; - - //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()); - - if (req.sequence.present()) { - auto& trackerData = self->peekTracker[peekId]; - trackerData.lastUpdate = now(); - auto& sequenceData = trackerData.sequence_version[sequence + 1]; - if (sequenceData.isSet()) { - if (sequenceData.getFuture().get() != reply.end) { - TEST(true); // tlog peek second attempt ended at a different version (2) - req.reply.sendError(operation_obsolete()); - return Void(); - } - } else { - sequenceData.send(reply.end); - } - reply.begin = req.begin; - } - - req.reply.send(reply); - return Void(); -} - ACTOR Future doQueueCommit(TLogData* self, Reference logData) { state Version ver = logData->version.get(); state Version commitNumber = self->queueCommitBegin + 1; @@ -1476,7 +1343,8 @@ ACTOR Future serveTLogInterface(TLogData* self, PromiseStream warningCollectorInput) { loop choose { when(TLogPeekRequest req = waitNext(tli.peekMessages.getFuture())) { - logData->addActor.send(tLogPeekMessages(self, req, logData)); + logData->addActor.send(tLogPeekMessages( + req.reply, self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence)); } when(TLogPeekStreamRequest req = waitNext(tli.peekStreamMessages.getFuture())) { TraceEvent(SevDebug, "TLogPeekStream", logData->logId) diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index b6964bf9c9..5c27581b2c 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -1208,301 +1208,37 @@ void peekMessagesFromMemory(Reference self, } // Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request -ACTOR Future peekTLog(TLogData* self, - Reference logData, - Version begin, - Tag tag, - bool returnIfBlocked = false, - bool reqOnlySpilled = false, - Optional> sequence = Optional>()) { - state BinaryWriter messages(Unversioned()); - state BinaryWriter messages2(Unversioned()); - state int sequenceNum = -1; - state UID peekId; - state double queueStart = now(); - - if (tag.locality == tagLocalityTxs && tag.id >= logData->txsTags && logData->txsTags > 0) { - tag.id = tag.id % logData->txsTags; - } - - // TODO: once the fake stream is replaced by ReplyPromiseStream, we can remove the code handling sequence requests - // STEP: a. mark obsolete sequence requests; b. wait previous sequence requests are handled in order - if (sequence.present()) { - peekId = sequence.get().first; - sequenceNum = sequence.get().second; - if (sequenceNum >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && - logData->peekTracker.find(peekId) == logData->peekTracker.end()) { - throw operation_obsolete(); - } - auto& trackerData = logData->peekTracker[peekId]; - if (sequenceNum == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) { - trackerData.tag = tag; - trackerData.sequence_version[0].send(std::make_pair(begin, reqOnlySpilled)); - } - auto seqBegin = trackerData.sequence_version.begin(); - // The peek cursor and this comparison need to agree about the maximum number of in-flight requests. - while (trackerData.sequence_version.size() && - seqBegin->first <= sequenceNum - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) { - if (seqBegin->second.canBeSet()) { - seqBegin->second.sendError(operation_obsolete()); - } - trackerData.sequence_version.erase(seqBegin); - seqBegin = trackerData.sequence_version.begin(); - } - - if (trackerData.sequence_version.size() && sequenceNum < seqBegin->first) { - throw operation_obsolete(); - } - - Future> fPrevPeekData = trackerData.sequence_version[sequenceNum].getFuture(); - if (fPrevPeekData.isReady()) { - trackerData.unblockedPeeks++; - double t = now() - trackerData.lastUpdate; - if (t > trackerData.idleMax) - trackerData.idleMax = t; - trackerData.idleTime += t; - } - trackerData.lastUpdate = now(); - std::pair prevPeekData = wait(fPrevPeekData); - begin = std::max(prevPeekData.first, begin); - reqOnlySpilled = prevPeekData.second; - wait(yield()); - } - - state double blockStart = now(); - - if (returnIfBlocked && logData->version.get() < begin) { - if (sequence.present()) { - auto& trackerData = logData->peekTracker[peekId]; - auto& sequenceData = trackerData.sequence_version[sequenceNum + 1]; - trackerData.lastUpdate = now(); - if (!sequenceData.isSet()) { - sequenceData.send(std::make_pair(begin, reqOnlySpilled)); - } - } - throw end_of_stream(); - } - - //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); - // Wait until we have something to return that the caller doesn't already have - if (logData->version.get() < begin) { - wait(logData->version.whenAtLeast(begin)); - wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); - } - - if (logData->locality != tagLocalitySatellite && tag.locality == tagLocalityLogRouter) { - wait(self->concurrentLogRouterReads.take()); - state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads); - wait(delay(0.0, TaskPriority::Low)); - } - - if (begin <= logData->persistentDataDurableVersion && tag.locality != tagLocalityTxs && tag != txsTag) { - // Reading spilled data will almost always imply that the storage server is >5s behind the rest - // of the cluster. We shouldn't prioritize spending CPU on helping this server catch up - // slightly faster over keeping the rest of the cluster operating normally. - // txsTag is only ever peeked on recovery, and we would still wish to prioritize requests - // that impact recovery duration. - wait(delay(0, TaskPriority::TLogSpilledPeekReply)); - } - - state double workStart = now(); - - Version poppedVer = poppedVersion(logData, tag); - if (poppedVer > begin) { - // reply with an empty message and let the next reply start from poppedVer - TLogPeekReply rep; - rep.maxKnownVersion = logData->version.get(); - rep.minKnownCommittedVersion = logData->minKnownCommittedVersion; - rep.popped = poppedVer; - rep.end = poppedVer; - rep.onlySpilled = false; - - // TODO: once the fake stream is replaced by ReplyPromiseStream, we can remove the code handling sequence - // requests. - if (sequence.present()) { - auto& trackerData = logData->peekTracker[peekId]; - auto& sequenceData = trackerData.sequence_version[sequenceNum + 1]; - trackerData.lastUpdate = now(); - if (trackerData.sequence_version.size() && sequenceNum + 1 < trackerData.sequence_version.begin()->first) { - if (!sequenceData.isSet()) - sequenceData.sendError(operation_obsolete()); - throw operation_obsolete(); - } - if (sequenceData.isSet()) { - if (sequenceData.getFuture().get().first != rep.end) { - TEST(true); // 1 tlog peek second attempt ended at a different version - throw operation_obsolete(); - } - } else { - sequenceData.send(std::make_pair(rep.end, rep.onlySpilled)); - } - rep.begin = begin; - } - - return rep; - } - - state Version endVersion = logData->version.get() + 1; - state bool onlySpilled = false; - - // grab messages from disk - //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); - if (begin <= logData->persistentDataDurableVersion) { - // Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We - // may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if - // an initial attempt to read from disk results in insufficient data and the required data is no longer in - // memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the - // result? - - if (reqOnlySpilled) { - endVersion = logData->persistentDataDurableVersion + 1; - } else { - peekMessagesFromMemory(logData, tag, begin, messages2, endVersion); - } - - RangeResult kvs = wait(self->persistentData->readRange( - KeyRangeRef(persistTagMessagesKey(logData->logId, tag, begin), - persistTagMessagesKey(logData->logId, tag, logData->persistentDataDurableVersion + 1)), - SERVER_KNOBS->DESIRED_TOTAL_BYTES, - SERVER_KNOBS->DESIRED_TOTAL_BYTES)); - - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", reply.getEndpoint().address).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); - - for (auto& kv : kvs) { - auto ver = decodeTagMessagesKey(kv.key); - messages << VERSION_HEADER << ver; - messages.serializeBytes(kv.value); - } - - if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) { - endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1; - onlySpilled = true; - } else { - messages.serializeBytes(messages2.toValue()); - } - } else { - peekMessagesFromMemory(logData, tag, begin, messages, endVersion); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", reply.getEndpoint().address).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); - } - - TLogPeekReply reply; - reply.maxKnownVersion = logData->version.get(); - reply.minKnownCommittedVersion = logData->minKnownCommittedVersion; - reply.messages = StringRef(reply.arena, messages.toValue()); - reply.end = endVersion; - reply.onlySpilled = onlySpilled; - - //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", reply.getEndpoint().address); - - if (sequence.present()) { - auto& trackerData = logData->peekTracker[peekId]; - trackerData.lastUpdate = now(); - - double queueT = blockStart - queueStart; - double blockT = workStart - blockStart; - double workT = now() - workStart; - - trackerData.totalPeeks++; - trackerData.replyBytes += reply.messages.size(); - - if (queueT > trackerData.queueMax) - trackerData.queueMax = queueT; - if (blockT > trackerData.blockMax) - trackerData.blockMax = blockT; - if (workT > trackerData.workMax) - trackerData.workMax = workT; - - trackerData.queueTime += queueT; - trackerData.blockTime += blockT; - trackerData.workTime += workT; - - auto& sequenceData = trackerData.sequence_version[sequenceNum + 1]; - if (trackerData.sequence_version.size() && sequenceNum + 1 < trackerData.sequence_version.begin()->first) { - if (!sequenceData.isSet()) { - // It would technically be more correct to .send({req.begin, req.onlySpilled}), as the next - // request might still be in the window of active requests, but LogSystemPeekCursor will - // throw away all future responses upon getting an operation_obsolete(), so computing a - // response will probably be a waste of CPU. - sequenceData.sendError(operation_obsolete()); - } - throw operation_obsolete(); - } - if (sequenceData.isSet()) { - trackerData.duplicatePeeks++; - if (sequenceData.getFuture().get().first != reply.end) { - TEST(true); // 1 tlog peek second attempt ended at a different version (2) - throw operation_obsolete(); - } - } else { - sequenceData.send(std::make_pair(reply.end, reply.onlySpilled)); - } - reply.begin = begin; - } - - return reply; -} - -// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover -ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference logData) { - self->activePeekStreams++; - - state Version begin = req.begin; - state bool onlySpilled = false; - if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) { - req.tag.id = req.tag.id % logData->txsTags; - } - req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes)); - loop { - state TLogPeekStreamReply reply; - try { - wait(req.reply.onReady() && - store(reply.rep, peekTLog(self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled))); - req.reply.send(reply); - begin = reply.rep.end; - onlySpilled = reply.rep.onlySpilled; - if (reply.rep.end > logData->version.get()) { - wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); - } else { - wait(delay(0, g_network->getCurrentTask())); - } - } catch (Error& e) { - self->activePeekStreams--; - TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true); - - if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { - req.reply.sendError(e); - return Void(); - } else { - throw; - } - } - } -} - - -ACTOR Future tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference logData) { +ACTOR template +Future tLogPeekMessages(PromiseType replyPromise, + TLogData* self, + Reference logData, + Version reqBegin, + Tag reqTag, + bool reqReturnIfBlocked = false, + bool reqOnlySpilled = false, + Optional> reqSequence = Optional>()) { state BinaryWriter messages(Unversioned()); state BinaryWriter messages2(Unversioned()); state int sequence = -1; state UID peekId; state double queueStart = now(); - if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) { - req.tag.id = req.tag.id % logData->txsTags; + if (reqTag.locality == tagLocalityTxs && reqTag.id >= logData->txsTags && logData->txsTags > 0) { + reqTag.id = reqTag.id % logData->txsTags; } - if (req.sequence.present()) { + if (reqSequence.present()) { try { - peekId = req.sequence.get().first; - sequence = req.sequence.get().second; + peekId = reqSequence.get().first; + sequence = reqSequence.get().second; if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && logData->peekTracker.find(peekId) == logData->peekTracker.end()) { throw operation_obsolete(); } auto& trackerData = logData->peekTracker[peekId]; if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) { - trackerData.tag = req.tag; - trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled)); + trackerData.tag = reqTag; + trackerData.sequence_version[0].send(std::make_pair(reqBegin, reqOnlySpilled)); } auto seqBegin = trackerData.sequence_version.begin(); while (trackerData.sequence_version.size() && @@ -1529,12 +1265,12 @@ ACTOR Future tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen trackerData.lastUpdate = now(); std::pair prevPeekData = wait(fPrevPeekData); - req.begin = std::max(prevPeekData.first, req.begin); - req.onlySpilled = prevPeekData.second; + reqBegin = std::max(prevPeekData.first, reqBegin); + reqOnlySpilled = prevPeekData.second; wait(yield()); } catch (Error& e) { if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) { - req.reply.sendError(e); + replyPromise.sendError(e); return Void(); } else { throw; @@ -1544,32 +1280,32 @@ ACTOR Future tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen state double blockStart = now(); - if (req.returnIfBlocked && logData->version.get() < req.begin) { - req.reply.sendError(end_of_stream()); - if (req.sequence.present()) { + if (reqReturnIfBlocked && logData->version.get() < reqBegin) { + replyPromise.sendError(end_of_stream()); + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; auto& sequenceData = trackerData.sequence_version[sequence + 1]; if (!sequenceData.isSet()) { - sequenceData.send(std::make_pair(req.begin, req.onlySpilled)); + sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled)); } } return Void(); } - //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); + //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); // Wait until we have something to return that the caller doesn't already have - if (logData->version.get() < req.begin) { - wait(logData->version.whenAtLeast(req.begin)); + if (logData->version.get() < reqBegin) { + wait(logData->version.whenAtLeast(reqBegin)); wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); } - if (logData->locality != tagLocalitySatellite && req.tag.locality == tagLocalityLogRouter) { + if (logData->locality != tagLocalitySatellite && reqTag.locality == tagLocalityLogRouter) { wait(self->concurrentLogRouterReads.take()); state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads); wait(delay(0.0, TaskPriority::Low)); } - if (req.begin <= logData->persistentDataDurableVersion && req.tag.locality != tagLocalityTxs && req.tag != txsTag) { + if (reqBegin <= logData->persistentDataDurableVersion && reqTag.locality != tagLocalityTxs && reqTag != txsTag) { // Reading spilled data will almost always imply that the storage server is >5s behind the rest // of the cluster. We shouldn't prioritize spending CPU on helping this server catch up // slightly faster over keeping the rest of the cluster operating normally. @@ -1580,8 +1316,8 @@ ACTOR Future tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen state double workStart = now(); - Version poppedVer = poppedVersion(logData, req.tag); - if (poppedVer > req.begin) { + Version poppedVer = poppedVersion(logData, reqTag); + if (poppedVer > reqBegin) { TLogPeekReply rep; rep.maxKnownVersion = logData->version.get(); rep.minKnownCommittedVersion = logData->minKnownCommittedVersion; @@ -1589,12 +1325,12 @@ ACTOR Future tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen rep.end = poppedVer; rep.onlySpilled = false; - if (req.sequence.present()) { + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; auto& sequenceData = trackerData.sequence_version[sequence + 1]; trackerData.lastUpdate = now(); if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); if (!sequenceData.isSet()) sequenceData.sendError(operation_obsolete()); return Void(); @@ -1602,16 +1338,16 @@ ACTOR Future tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen if (sequenceData.isSet()) { if (sequenceData.getFuture().get().first != rep.end) { TEST(true); // tlog peek second attempt ended at a different version - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(std::make_pair(rep.end, rep.onlySpilled)); } - rep.begin = req.begin; + rep.begin = reqBegin; } - req.reply.send(rep); + replyPromise.send(rep); return Void(); } @@ -1619,27 +1355,27 @@ ACTOR Future tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen state bool onlySpilled = false; // grab messages from disk - //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); - if (req.begin <= logData->persistentDataDurableVersion) { + //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); + if (reqBegin <= logData->persistentDataDurableVersion) { // Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We // may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if // an initial attempt to read from disk results in insufficient data and the required data is no longer in // memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the // result? - if (req.onlySpilled) { + if (reqOnlySpilled) { endVersion = logData->persistentDataDurableVersion + 1; } else { - peekMessagesFromMemory(logData, req.tag, req.begin, messages2, endVersion); + peekMessagesFromMemory(logData, reqTag, reqBegin, messages2, endVersion); } RangeResult kvs = wait(self->persistentData->readRange( - KeyRangeRef(persistTagMessagesKey(logData->logId, req.tag, req.begin), - persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), + KeyRangeRef(persistTagMessagesKey(logData->logId, reqTag, reqBegin), + persistTagMessagesKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES)); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().address).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); for (auto& kv : kvs) { auto ver = decodeTagMessagesKey(kv.key); @@ -1654,20 +1390,20 @@ ACTOR Future tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen messages.serializeBytes(messages2.toValue()); } } else { - peekMessagesFromMemory(logData, req.tag, req.begin, messages, endVersion); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); + peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().address).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); } TLogPeekReply reply; reply.maxKnownVersion = logData->version.get(); reply.minKnownCommittedVersion = logData->minKnownCommittedVersion; - reply.messages = messages.toValue(); + reply.messages = StringRef(reply.arena, messages.toValue()); reply.end = endVersion; reply.onlySpilled = onlySpilled; - //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address); + //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", replyPromise.getEndpoint().address); - if (req.sequence.present()) { + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; trackerData.lastUpdate = now(); @@ -1691,7 +1427,7 @@ ACTOR Future tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen auto& sequenceData = trackerData.sequence_version[sequence + 1]; if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); if (!sequenceData.isSet()) sequenceData.sendError(operation_obsolete()); return Void(); @@ -1700,19 +1436,60 @@ ACTOR Future tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen trackerData.duplicatePeeks++; if (sequenceData.getFuture().get().first != reply.end) { TEST(true); // tlog peek second attempt ended at a different version (2) - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(std::make_pair(reply.end, reply.onlySpilled)); } - reply.begin = req.begin; + reply.begin = reqBegin; } - req.reply.send(reply); + replyPromise.send(reply); return Void(); } +// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover +ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference logData) { + self->activePeekStreams++; + + state Version begin = req.begin; + state bool onlySpilled = false; + req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes)); + loop { + state TLogPeekStreamReply reply; + state Promise promise; + state Future future(promise.getFuture()); + try { + wait(req.reply.onReady() && + tLogPeekMessages(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled)); + ASSERT(future.isReady()); + if (future.isError()) { + throw future.getError(); + } + + reply.rep = future.get(); + req.reply.send(reply); + begin = reply.rep.end; + onlySpilled = reply.rep.onlySpilled; + if (reply.rep.end > logData->version.get()) { + wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); + } else { + wait(delay(0, g_network->getCurrentTask())); + } + } catch (Error& e) { + self->activePeekStreams--; + TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true); + + if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { + req.reply.sendError(e); + return Void(); + } else { + throw; + } + } + } +} ACTOR Future doQueueCommit(TLogData* self, Reference logData, @@ -2208,7 +1985,8 @@ ACTOR Future serveTLogInterface(TLogData* self, } } when(TLogPeekRequest req = waitNext(tli.peekMessages.getFuture())) { - logData->addActor.send(tLogPeekMessages(self, req, logData)); + logData->addActor.send(tLogPeekMessages( + req.reply, self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence)); } when(TLogPeekStreamRequest req = waitNext(tli.peekStreamMessages.getFuture())) { TraceEvent(SevDebug, "TLogPeekStream", logData->logId) diff --git a/fdbserver/OldTLogServer_6_2.actor.cpp b/fdbserver/OldTLogServer_6_2.actor.cpp index 88029f9eb1..81025d93de 100644 --- a/fdbserver/OldTLogServer_6_2.actor.cpp +++ b/fdbserver/OldTLogServer_6_2.actor.cpp @@ -1547,67 +1547,66 @@ ACTOR Future> parseMessagesForTag(StringRef commitBlob, T } // Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request -ACTOR template -Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, - Reference logData, - Version begin, - Tag tag, - bool returnIfBlocked = false, - bool reqOnlySpilled = false, - Optional> sequence = Optional>()) { +ACTOR template +Future tLogPeekMessages(PromiseType replyPromise, + TLogData* self, + Reference logData, + Version reqBegin, + Tag reqTag, + bool reqReturnIfBlocked = false, + bool reqOnlySpilled = false, + Optional> reqSequence = Optional>()) { state BinaryWriter messages(Unversioned()); state BinaryWriter messages2(Unversioned()); - state int sequenceNum = -1; + state int sequence = -1; state UID peekId; state double queueStart = now(); - if (tag.locality == tagLocalityTxs && tag.id >= logData->txsTags && logData->txsTags > 0) { - tag.id = tag.id % logData->txsTags; + if (reqTag.locality == tagLocalityTxs && reqTag.id >= logData->txsTags && logData->txsTags > 0) { + reqTag.id = reqTag.id % logData->txsTags; } - // TODO: once the fake stream is replaced by ReplyPromiseStream, we can remove the code handling sequence requests - // STEP: a. mark obsolete sequence requests; b. wait previous sequence requests are handled in order - if (sequence.present()) { - try{ - peekId = sequence.get().first; - sequenceNum = sequence.get().second; - if (sequenceNum >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && - logData->peekTracker.find(peekId) == logData->peekTracker.end()) { - throw operation_obsolete(); - } - auto& trackerData = logData->peekTracker[peekId]; - if (sequenceNum == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) { - trackerData.tag = tag; - trackerData.sequence_version[0].send(std::make_pair(begin, reqOnlySpilled)); - } - auto seqBegin = trackerData.sequence_version.begin(); - // The peek cursor and this comparison need to agree about the maximum number of in-flight requests. - while (trackerData.sequence_version.size() && - seqBegin->first <= sequenceNum - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) { - if (seqBegin->second.canBeSet()) { - seqBegin->second.sendError(operation_obsolete()); - } - trackerData.sequence_version.erase(seqBegin); - seqBegin = trackerData.sequence_version.begin(); - } + if (reqSequence.present()) { + try { + peekId = reqSequence.get().first; + sequence = reqSequence.get().second; + if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && + logData->peekTracker.find(peekId) == logData->peekTracker.end()) { + throw operation_obsolete(); + } + auto& trackerData = logData->peekTracker[peekId]; + if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) { + trackerData.tag = reqTag; + trackerData.sequence_version[0].send(std::make_pair(reqBegin, reqOnlySpilled)); + } + auto seqBegin = trackerData.sequence_version.begin(); + // The peek cursor and this comparison need to agree about the maximum number of in-flight requests. + while (trackerData.sequence_version.size() && + seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) { + if (seqBegin->second.canBeSet()) { + seqBegin->second.sendError(operation_obsolete()); + } + trackerData.sequence_version.erase(seqBegin); + seqBegin = trackerData.sequence_version.begin(); + } - if (trackerData.sequence_version.size() && sequenceNum < seqBegin->first) { - throw operation_obsolete(); - } + if (trackerData.sequence_version.size() && sequence < seqBegin->first) { + throw operation_obsolete(); + } - Future> fPrevPeekData = trackerData.sequence_version[sequenceNum].getFuture(); - if (fPrevPeekData.isReady()) { - trackerData.unblockedPeeks++; - double t = now() - trackerData.lastUpdate; - if (t > trackerData.idleMax) - trackerData.idleMax = t; - trackerData.idleTime += t; - } - trackerData.lastUpdate = now(); - std::pair prevPeekData = wait(fPrevPeekData); - begin = std::max(prevPeekData.first, begin); - reqOnlySpilled = prevPeekData.second; - wait(yield()); + Future> fPrevPeekData = trackerData.sequence_version[sequence].getFuture(); + if (fPrevPeekData.isReady()) { + trackerData.unblockedPeeks++; + double t = now() - trackerData.lastUpdate; + if (t > trackerData.idleMax) + trackerData.idleMax = t; + trackerData.idleTime += t; + } + trackerData.lastUpdate = now(); + std::pair prevPeekData = wait(fPrevPeekData); + reqBegin = std::max(prevPeekData.first, reqBegin); + reqOnlySpilled = prevPeekData.second; + wait(yield()); } catch (Error& e) { if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) { replyPromise.sendError(e); @@ -1620,33 +1619,32 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, state double blockStart = now(); - if (returnIfBlocked && logData->version.get() < begin) { - replyPromise.sendError(end_of_stream()); - if (sequence.present()) { + if (reqReturnIfBlocked && logData->version.get() < reqBegin) { + replyPromise.sendError(end_of_stream()); + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; - auto& sequenceData = trackerData.sequence_version[sequenceNum + 1]; - trackerData.lastUpdate = now(); + auto& sequenceData = trackerData.sequence_version[sequence + 1]; if (!sequenceData.isSet()) { - sequenceData.send(std::make_pair(begin, reqOnlySpilled)); + sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled)); } } return Void(); } - //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); + //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); // Wait until we have something to return that the caller doesn't already have - if (logData->version.get() < begin) { - wait(logData->version.whenAtLeast(begin)); + if (logData->version.get() < reqBegin) { + wait(logData->version.whenAtLeast(reqBegin)); wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); } - if (logData->locality != tagLocalitySatellite && tag.locality == tagLocalityLogRouter) { + if (reqTag.locality == tagLocalityLogRouter) { wait(self->concurrentLogRouterReads.take()); state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads); wait(delay(0.0, TaskPriority::Low)); } - if (begin <= logData->persistentDataDurableVersion && tag.locality != tagLocalityTxs && tag != txsTag) { + if (reqBegin <= logData->persistentDataDurableVersion && reqTag.locality != tagLocalityTxs && reqTag != txsTag) { // Reading spilled data will almost always imply that the storage server is >5s behind the rest // of the cluster. We shouldn't prioritize spending CPU on helping this server catch up // slightly faster over keeping the rest of the cluster operating normally. @@ -1657,9 +1655,8 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, state double workStart = now(); - Version poppedVer = poppedVersion(logData, tag); - if (poppedVer > begin) { - // reply with an empty message and let the next reply start from poppedVer + Version poppedVer = poppedVersion(logData, reqTag); + if (poppedVer > reqBegin) { TLogPeekReply rep; rep.maxKnownVersion = logData->version.get(); rep.minKnownCommittedVersion = logData->minKnownCommittedVersion; @@ -1667,13 +1664,11 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, rep.end = poppedVer; rep.onlySpilled = false; - // TODO: once the fake stream is replaced by ReplyPromiseStream, we can remove the code handling sequence - // requests. - if (sequence.present()) { + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; - auto& sequenceData = trackerData.sequence_version[sequenceNum + 1]; + auto& sequenceData = trackerData.sequence_version[sequence + 1]; trackerData.lastUpdate = now(); - if (trackerData.sequence_version.size() && sequenceNum + 1 < trackerData.sequence_version.begin()->first) { + if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { replyPromise.sendError(operation_obsolete()); if (!sequenceData.isSet()) sequenceData.sendError(operation_obsolete()); @@ -1681,14 +1676,14 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, } if (sequenceData.isSet()) { if (sequenceData.getFuture().get().first != rep.end) { - TEST(true); // 1 tlog peek second attempt ended at a different version + TEST(true); // tlog peek second attempt ended at a different version replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(std::make_pair(rep.end, rep.onlySpilled)); } - rep.begin = begin; + rep.begin = reqBegin; } replyPromise.send(rep); @@ -1699,8 +1694,8 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, state bool onlySpilled = false; // grab messages from disk - //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); - if (begin <= logData->persistentDataDurableVersion) { + //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); + if (reqBegin <= logData->persistentDataDurableVersion) { // Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We // may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if // an initial attempt to read from disk results in insufficient data and the required data is no longer in @@ -1710,13 +1705,13 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, if (reqOnlySpilled) { endVersion = logData->persistentDataDurableVersion + 1; } else { - peekMessagesFromMemory(logData, tag, begin, messages2, endVersion); + peekMessagesFromMemory(logData, reqTag, reqBegin, messages2, endVersion); } - if (tag.locality == tagLocalityTxs || tag == txsTag) { + if (reqTag.locality == tagLocalityTxs || reqTag == txsTag) { RangeResult kvs = wait(self->persistentData->readRange( - KeyRangeRef(persistTagMessagesKey(logData->logId, tag, begin), - persistTagMessagesKey(logData->logId, tag, logData->persistentDataDurableVersion + 1)), + KeyRangeRef(persistTagMessagesKey(logData->logId, reqTag, reqBegin), + persistTagMessagesKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES)); @@ -1735,11 +1730,12 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, } else { // FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow. RangeResult kvrefs = wait(self->persistentData->readRange( - KeyRangeRef(persistTagMessageRefsKey(logData->logId, tag, begin), - persistTagMessageRefsKey(logData->logId, tag, logData->persistentDataDurableVersion + 1)), + KeyRangeRef( + persistTagMessageRefsKey(logData->logId, reqTag, reqBegin), + persistTagMessageRefsKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1)); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); state std::vector> commitLocations; state bool earlyEnd = false; @@ -1756,7 +1752,7 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, earlyEnd = true; break; } - if (sd.version >= begin) { + if (sd.version >= reqBegin) { firstVersion = std::min(firstVersion, sd.version); const IDiskQueue::location end = sd.start.lo + sd.length; commitLocations.emplace_back(sd.start, end); @@ -1798,7 +1794,7 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, messages << VERSION_HEADER << entry.version; std::vector rawMessages = - wait(parseMessagesForTag(entry.messages, tag, logData->logRouterTags)); + wait(parseMessagesForTag(entry.messages, reqTag, logData->logRouterTags)); for (const StringRef& msg : rawMessages) { messages.serializeBytes(msg); } @@ -1821,10 +1817,10 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, if (reqOnlySpilled) { endVersion = logData->persistentDataDurableVersion + 1; } else { - peekMessagesFromMemory(logData, tag, begin, messages, endVersion); + peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion); } - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); } TLogPeekReply reply; @@ -1834,12 +1830,9 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, reply.end = endVersion; reply.onlySpilled = onlySpilled; - //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("Tag", req.tag.toString()). - // detail("BeginVer", req.begin).detail("EndVer", reply.end). - // detail("MsgBytes", reply.messages.expectedSize()). - // detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()); + //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()); - if (sequence.present()) { + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; trackerData.lastUpdate = now(); @@ -1861,29 +1854,24 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, trackerData.blockTime += blockT; trackerData.workTime += workT; - auto& sequenceData = trackerData.sequence_version[sequenceNum + 1]; - if (trackerData.sequence_version.size() && sequenceNum + 1 < trackerData.sequence_version.begin()->first) { + auto& sequenceData = trackerData.sequence_version[sequence + 1]; + if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { replyPromise.sendError(operation_obsolete()); - if (!sequenceData.isSet()) { - // It would technically be more correct to .send({req.begin, req.onlySpilled}), as the next - // request might still be in the window of active requests, but LogSystemPeekCursor will - // throw away all future responses upon getting an operation_obsolete(), so computing a - // response will probably be a waste of CPU. + if (!sequenceData.isSet()) sequenceData.sendError(operation_obsolete()); - } return Void(); } if (sequenceData.isSet()) { trackerData.duplicatePeeks++; if (sequenceData.getFuture().get().first != reply.end) { - TEST(true); // 1 tlog peek second attempt ended at a different version (2) + TEST(true); // tlog peek second attempt ended at a different version (2) replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(std::make_pair(reply.end, reply.onlySpilled)); } - reply.begin = begin; + reply.begin = reqBegin; } replyPromise.send(reply); @@ -1902,9 +1890,10 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref state Promise promise; state Future future(promise.getFuture()); try { - wait(req.reply.onReady() && peekTLogAndSend(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled)); + wait(req.reply.onReady() && + tLogPeekMessages(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled)); ASSERT(future.isReady()); - if(future.isError()) { + if (future.isError()) { throw future.getError(); } @@ -1931,330 +1920,6 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref } } - -ACTOR Future tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference logData) { - state BinaryWriter messages(Unversioned()); - state BinaryWriter messages2(Unversioned()); - state int sequence = -1; - state UID peekId; - state double queueStart = now(); - - if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) { - req.tag.id = req.tag.id % logData->txsTags; - } - - if (req.sequence.present()) { - try { - peekId = req.sequence.get().first; - sequence = req.sequence.get().second; - if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && - logData->peekTracker.find(peekId) == logData->peekTracker.end()) { - throw operation_obsolete(); - } - auto& trackerData = logData->peekTracker[peekId]; - if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) { - trackerData.tag = req.tag; - trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled)); - } - auto seqBegin = trackerData.sequence_version.begin(); - // The peek cursor and this comparison need to agree about the maximum number of in-flight requests. - while (trackerData.sequence_version.size() && - seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) { - if (seqBegin->second.canBeSet()) { - seqBegin->second.sendError(operation_obsolete()); - } - trackerData.sequence_version.erase(seqBegin); - seqBegin = trackerData.sequence_version.begin(); - } - - if (trackerData.sequence_version.size() && sequence < seqBegin->first) { - throw operation_obsolete(); - } - - Future> fPrevPeekData = trackerData.sequence_version[sequence].getFuture(); - if (fPrevPeekData.isReady()) { - trackerData.unblockedPeeks++; - double t = now() - trackerData.lastUpdate; - if (t > trackerData.idleMax) - trackerData.idleMax = t; - trackerData.idleTime += t; - } - trackerData.lastUpdate = now(); - std::pair prevPeekData = wait(fPrevPeekData); - req.begin = std::max(prevPeekData.first, req.begin); - req.onlySpilled = prevPeekData.second; - wait(yield()); - } catch (Error& e) { - if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) { - req.reply.sendError(e); - return Void(); - } else { - throw; - } - } - } - - state double blockStart = now(); - - if (req.returnIfBlocked && logData->version.get() < req.begin) { - req.reply.sendError(end_of_stream()); - if (req.sequence.present()) { - auto& trackerData = logData->peekTracker[peekId]; - auto& sequenceData = trackerData.sequence_version[sequence + 1]; - if (!sequenceData.isSet()) { - sequenceData.send(std::make_pair(req.begin, req.onlySpilled)); - } - } - return Void(); - } - - //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); - // Wait until we have something to return that the caller doesn't already have - if (logData->version.get() < req.begin) { - wait(logData->version.whenAtLeast(req.begin)); - wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); - } - - if (req.tag.locality == tagLocalityLogRouter) { - wait(self->concurrentLogRouterReads.take()); - state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads); - wait(delay(0.0, TaskPriority::Low)); - } - - if (req.begin <= logData->persistentDataDurableVersion && req.tag.locality != tagLocalityTxs && req.tag != txsTag) { - // Reading spilled data will almost always imply that the storage server is >5s behind the rest - // of the cluster. We shouldn't prioritize spending CPU on helping this server catch up - // slightly faster over keeping the rest of the cluster operating normally. - // txsTag is only ever peeked on recovery, and we would still wish to prioritize requests - // that impact recovery duration. - wait(delay(0, TaskPriority::TLogSpilledPeekReply)); - } - - state double workStart = now(); - - Version poppedVer = poppedVersion(logData, req.tag); - if (poppedVer > req.begin) { - TLogPeekReply rep; - rep.maxKnownVersion = logData->version.get(); - rep.minKnownCommittedVersion = logData->minKnownCommittedVersion; - rep.popped = poppedVer; - rep.end = poppedVer; - rep.onlySpilled = false; - - if (req.sequence.present()) { - auto& trackerData = logData->peekTracker[peekId]; - auto& sequenceData = trackerData.sequence_version[sequence + 1]; - trackerData.lastUpdate = now(); - if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { - req.reply.sendError(operation_obsolete()); - if (!sequenceData.isSet()) - sequenceData.sendError(operation_obsolete()); - return Void(); - } - if (sequenceData.isSet()) { - if (sequenceData.getFuture().get().first != rep.end) { - TEST(true); // tlog peek second attempt ended at a different version - req.reply.sendError(operation_obsolete()); - return Void(); - } - } else { - sequenceData.send(std::make_pair(rep.end, rep.onlySpilled)); - } - rep.begin = req.begin; - } - - req.reply.send(rep); - return Void(); - } - - state Version endVersion = logData->version.get() + 1; - state bool onlySpilled = false; - - // grab messages from disk - //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); - if (req.begin <= logData->persistentDataDurableVersion) { - // Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We - // may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if - // an initial attempt to read from disk results in insufficient data and the required data is no longer in - // memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the - // result? - - if (req.onlySpilled) { - endVersion = logData->persistentDataDurableVersion + 1; - } else { - peekMessagesFromMemory(logData, req.tag, req.begin, messages2, endVersion); - } - - if (req.tag.locality == tagLocalityTxs || req.tag == txsTag) { - RangeResult kvs = wait(self->persistentData->readRange( - KeyRangeRef(persistTagMessagesKey(logData->logId, req.tag, req.begin), - persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), - SERVER_KNOBS->DESIRED_TOTAL_BYTES, - SERVER_KNOBS->DESIRED_TOTAL_BYTES)); - - for (auto& kv : kvs) { - auto ver = decodeTagMessagesKey(kv.key); - messages << VERSION_HEADER << ver; - messages.serializeBytes(kv.value); - } - - if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) { - endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1; - onlySpilled = true; - } else { - messages.serializeBytes(messages2.toValue()); - } - } else { - // FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow. - RangeResult kvrefs = wait(self->persistentData->readRange( - KeyRangeRef( - persistTagMessageRefsKey(logData->logId, req.tag, req.begin), - persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), - SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1)); - - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); - - state std::vector> commitLocations; - state bool earlyEnd = false; - uint32_t mutationBytes = 0; - state uint64_t commitBytes = 0; - state Version firstVersion = std::numeric_limits::max(); - for (int i = 0; i < kvrefs.size() && i < SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK; i++) { - auto& kv = kvrefs[i]; - VectorRef spilledData; - BinaryReader r(kv.value, AssumeVersion(logData->protocolVersion)); - r >> spilledData; - for (const SpilledData& sd : spilledData) { - if (mutationBytes >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) { - earlyEnd = true; - break; - } - if (sd.version >= req.begin) { - firstVersion = std::min(firstVersion, sd.version); - const IDiskQueue::location end = sd.start.lo + sd.length; - commitLocations.emplace_back(sd.start, end); - // This isn't perfect, because we aren't accounting for page boundaries, but should be - // close enough. - commitBytes += sd.length; - mutationBytes += sd.mutationBytes; - } - } - if (earlyEnd) - break; - } - earlyEnd = earlyEnd || (kvrefs.size() >= SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1); - wait(self->peekMemoryLimiter.take(TaskPriority::TLogSpilledPeekReply, commitBytes)); - state FlowLock::Releaser memoryReservation(self->peekMemoryLimiter, commitBytes); - state std::vector>> messageReads; - messageReads.reserve(commitLocations.size()); - for (const auto& pair : commitLocations) { - messageReads.push_back(self->rawPersistentQueue->read(pair.first, pair.second, CheckHashes::True)); - } - commitLocations.clear(); - wait(waitForAll(messageReads)); - - state Version lastRefMessageVersion = 0; - state int index = 0; - loop { - if (index >= messageReads.size()) - break; - Standalone queueEntryData = messageReads[index].get(); - uint8_t valid; - const uint32_t length = *(uint32_t*)queueEntryData.begin(); - queueEntryData = queueEntryData.substr(4, queueEntryData.size() - 4); - BinaryReader rd(queueEntryData, IncludeVersion()); - state TLogQueueEntry entry; - rd >> entry >> valid; - ASSERT(valid == 0x01); - ASSERT(length + sizeof(valid) == queueEntryData.size()); - - messages << VERSION_HEADER << entry.version; - - std::vector rawMessages = - wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags)); - for (const StringRef& msg : rawMessages) { - messages.serializeBytes(msg); - } - - lastRefMessageVersion = entry.version; - index++; - } - - messageReads.clear(); - memoryReservation.release(); - - if (earlyEnd) { - endVersion = lastRefMessageVersion + 1; - onlySpilled = true; - } else { - messages.serializeBytes(messages2.toValue()); - } - } - } else { - if (req.onlySpilled) { - endVersion = logData->persistentDataDurableVersion + 1; - } else { - peekMessagesFromMemory(logData, req.tag, req.begin, messages, endVersion); - } - - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); - } - - TLogPeekReply reply; - reply.maxKnownVersion = logData->version.get(); - reply.minKnownCommittedVersion = logData->minKnownCommittedVersion; - reply.messages = messages.toValue(); - reply.end = endVersion; - reply.onlySpilled = onlySpilled; - - //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()); - - if (req.sequence.present()) { - auto& trackerData = logData->peekTracker[peekId]; - trackerData.lastUpdate = now(); - - double queueT = blockStart - queueStart; - double blockT = workStart - blockStart; - double workT = now() - workStart; - - trackerData.totalPeeks++; - trackerData.replyBytes += reply.messages.size(); - - if (queueT > trackerData.queueMax) - trackerData.queueMax = queueT; - if (blockT > trackerData.blockMax) - trackerData.blockMax = blockT; - if (workT > trackerData.workMax) - trackerData.workMax = workT; - - trackerData.queueTime += queueT; - trackerData.blockTime += blockT; - trackerData.workTime += workT; - - auto& sequenceData = trackerData.sequence_version[sequence + 1]; - if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { - req.reply.sendError(operation_obsolete()); - if (!sequenceData.isSet()) - sequenceData.sendError(operation_obsolete()); - return Void(); - } - if (sequenceData.isSet()) { - trackerData.duplicatePeeks++; - if (sequenceData.getFuture().get().first != reply.end) { - TEST(true); // tlog peek second attempt ended at a different version (2) - req.reply.sendError(operation_obsolete()); - return Void(); - } - } else { - sequenceData.send(std::make_pair(reply.end, reply.onlySpilled)); - } - reply.begin = req.begin; - } - - req.reply.send(reply); - return Void(); -} - ACTOR Future watchDegraded(TLogData* self) { if (g_network->isSimulated() && g_simulator.speedUpSimulation) { return Void(); @@ -2765,7 +2430,8 @@ ACTOR Future serveTLogInterface(TLogData* self, } } when(TLogPeekRequest req = waitNext(tli.peekMessages.getFuture())) { - logData->addActor.send(tLogPeekMessages(self, req, logData)); + logData->addActor.send(tLogPeekMessages( + req.reply, self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence)); } when(TLogPeekStreamRequest req = waitNext(tli.peekStreamMessages.getFuture())) { TraceEvent(SevDebug, "TLogPeekStream", logData->logId) diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 3ca0926135..aa2afd5406 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1587,67 +1587,66 @@ ACTOR Future> parseMessagesForTag(StringRef commitBlob, T } // Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request -ACTOR template -Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, - Reference logData, - Version begin, - Tag tag, - bool returnIfBlocked = false, - bool reqOnlySpilled = false, - Optional> sequence = Optional>()) { +ACTOR template +Future tLogPeekMessages(PromiseType replyPromise, + TLogData* self, + Reference logData, + Version reqBegin, + Tag reqTag, + bool reqReturnIfBlocked = false, + bool reqOnlySpilled = false, + Optional> reqSequence = Optional>()) { state BinaryWriter messages(Unversioned()); state BinaryWriter messages2(Unversioned()); - state int sequenceNum = -1; + state int sequence = -1; state UID peekId; state double queueStart = now(); - if (tag.locality == tagLocalityTxs && tag.id >= logData->txsTags && logData->txsTags > 0) { - tag.id = tag.id % logData->txsTags; + if (reqTag.locality == tagLocalityTxs && reqTag.id >= logData->txsTags && logData->txsTags > 0) { + reqTag.id = reqTag.id % logData->txsTags; } - // TODO: once the fake stream is replaced by ReplyPromiseStream, we can remove the code handling sequence requests - // STEP: a. mark obsolete sequence requests; b. wait previous sequence requests are handled in order - if (sequence.present()) { - try { - peekId = sequence.get().first; - sequenceNum = sequence.get().second; - if (sequenceNum >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && - logData->peekTracker.find(peekId) == logData->peekTracker.end()) { - throw operation_obsolete(); - } - auto& trackerData = logData->peekTracker[peekId]; - if (sequenceNum == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) { - trackerData.tag = tag; - trackerData.sequence_version[0].send(std::make_pair(begin, reqOnlySpilled)); - } - auto seqBegin = trackerData.sequence_version.begin(); - // The peek cursor and this comparison need to agree about the maximum number of in-flight requests. - while (trackerData.sequence_version.size() && - seqBegin->first <= sequenceNum - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) { - if (seqBegin->second.canBeSet()) { - seqBegin->second.sendError(operation_obsolete()); - } - trackerData.sequence_version.erase(seqBegin); - seqBegin = trackerData.sequence_version.begin(); - } + if (reqSequence.present()) { + try { + peekId = reqSequence.get().first; + sequence = reqSequence.get().second; + if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && + logData->peekTracker.find(peekId) == logData->peekTracker.end()) { + throw operation_obsolete(); + } + auto& trackerData = logData->peekTracker[peekId]; + if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) { + trackerData.tag = reqTag; + trackerData.sequence_version[0].send(std::make_pair(reqBegin, reqOnlySpilled)); + } + auto seqBegin = trackerData.sequence_version.begin(); + // The peek cursor and this comparison need to agree about the maximum number of in-flight requests. + while (trackerData.sequence_version.size() && + seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) { + if (seqBegin->second.canBeSet()) { + seqBegin->second.sendError(operation_obsolete()); + } + trackerData.sequence_version.erase(seqBegin); + seqBegin = trackerData.sequence_version.begin(); + } - if (trackerData.sequence_version.size() && sequenceNum < seqBegin->first) { - throw operation_obsolete(); - } + if (trackerData.sequence_version.size() && sequence < seqBegin->first) { + throw operation_obsolete(); + } - Future> fPrevPeekData = trackerData.sequence_version[sequenceNum].getFuture(); - if (fPrevPeekData.isReady()) { - trackerData.unblockedPeeks++; - double t = now() - trackerData.lastUpdate; - if (t > trackerData.idleMax) - trackerData.idleMax = t; - trackerData.idleTime += t; - } - trackerData.lastUpdate = now(); - std::pair prevPeekData = wait(fPrevPeekData); - begin = std::max(prevPeekData.first, begin); - reqOnlySpilled = prevPeekData.second; - wait(yield()); + Future> fPrevPeekData = trackerData.sequence_version[sequence].getFuture(); + if (fPrevPeekData.isReady()) { + trackerData.unblockedPeeks++; + double t = now() - trackerData.lastUpdate; + if (t > trackerData.idleMax) + trackerData.idleMax = t; + trackerData.idleTime += t; + } + trackerData.lastUpdate = now(); + std::pair prevPeekData = wait(fPrevPeekData); + reqBegin = std::max(prevPeekData.first, reqBegin); + reqOnlySpilled = prevPeekData.second; + wait(yield()); } catch (Error& e) { if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) { replyPromise.sendError(e); @@ -1660,33 +1659,33 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, state double blockStart = now(); - if (returnIfBlocked && logData->version.get() < begin) { - replyPromise.sendError(end_of_stream()); - if (sequence.present()) { + if (reqReturnIfBlocked && logData->version.get() < reqBegin) { + replyPromise.sendError(end_of_stream()); + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; - auto& sequenceData = trackerData.sequence_version[sequenceNum + 1]; + auto& sequenceData = trackerData.sequence_version[sequence + 1]; trackerData.lastUpdate = now(); if (!sequenceData.isSet()) { - sequenceData.send(std::make_pair(begin, reqOnlySpilled)); + sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled)); } } return Void(); } - //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); + //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); // Wait until we have something to return that the caller doesn't already have - if (logData->version.get() < begin) { - wait(logData->version.whenAtLeast(begin)); + if (logData->version.get() < reqBegin) { + wait(logData->version.whenAtLeast(reqBegin)); wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); } - if (logData->locality != tagLocalitySatellite && tag.locality == tagLocalityLogRouter) { + if (logData->locality != tagLocalitySatellite && reqTag.locality == tagLocalityLogRouter) { wait(self->concurrentLogRouterReads.take()); state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads); wait(delay(0.0, TaskPriority::Low)); } - if (begin <= logData->persistentDataDurableVersion && tag.locality != tagLocalityTxs && tag != txsTag) { + if (reqBegin <= logData->persistentDataDurableVersion && reqTag.locality != tagLocalityTxs && reqTag != txsTag) { // Reading spilled data will almost always imply that the storage server is >5s behind the rest // of the cluster. We shouldn't prioritize spending CPU on helping this server catch up // slightly faster over keeping the rest of the cluster operating normally. @@ -1697,9 +1696,8 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, state double workStart = now(); - Version poppedVer = poppedVersion(logData, tag); - if (poppedVer > begin) { - // reply with an empty message and let the next reply start from poppedVer + Version poppedVer = poppedVersion(logData, reqTag); + if (poppedVer > reqBegin) { TLogPeekReply rep; rep.maxKnownVersion = logData->version.get(); rep.minKnownCommittedVersion = logData->minKnownCommittedVersion; @@ -1707,13 +1705,11 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, rep.end = poppedVer; rep.onlySpilled = false; - // TODO: once the fake stream is replaced by ReplyPromiseStream, we can remove the code handling sequence - // requests. - if (sequence.present()) { + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; - auto& sequenceData = trackerData.sequence_version[sequenceNum + 1]; + auto& sequenceData = trackerData.sequence_version[sequence + 1]; trackerData.lastUpdate = now(); - if (trackerData.sequence_version.size() && sequenceNum + 1 < trackerData.sequence_version.begin()->first) { + if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { replyPromise.sendError(operation_obsolete()); if (!sequenceData.isSet()) sequenceData.sendError(operation_obsolete()); @@ -1721,14 +1717,14 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, } if (sequenceData.isSet()) { if (sequenceData.getFuture().get().first != rep.end) { - TEST(true); // xz tlog peek second attempt ended at a different version + TEST(true); // tlog peek second attempt ended at a different version replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(std::make_pair(rep.end, rep.onlySpilled)); } - rep.begin = begin; + rep.begin = reqBegin; } replyPromise.send(rep); @@ -1739,8 +1735,8 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, state bool onlySpilled = false; // grab messages from disk - //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); - if (begin <= logData->persistentDataDurableVersion) { + //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); + if (reqBegin <= logData->persistentDataDurableVersion) { // Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We // may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if // an initial attempt to read from disk results in insufficient data and the required data is no longer in @@ -1750,13 +1746,13 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, if (reqOnlySpilled) { endVersion = logData->persistentDataDurableVersion + 1; } else { - peekMessagesFromMemory(logData, tag, begin, messages2, endVersion); + peekMessagesFromMemory(logData, reqTag, reqBegin, messages2, endVersion); } - if (logData->shouldSpillByValue(tag)) { + if (logData->shouldSpillByValue(reqTag)) { RangeResult kvs = wait(self->persistentData->readRange( - KeyRangeRef(persistTagMessagesKey(logData->logId, tag, begin), - persistTagMessagesKey(logData->logId, tag, logData->persistentDataDurableVersion + 1)), + KeyRangeRef(persistTagMessagesKey(logData->logId, reqTag, reqBegin), + persistTagMessagesKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES)); @@ -1775,11 +1771,12 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, } else { // FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow. RangeResult kvrefs = wait(self->persistentData->readRange( - KeyRangeRef(persistTagMessageRefsKey(logData->logId, tag, begin), - persistTagMessageRefsKey(logData->logId, tag, logData->persistentDataDurableVersion + 1)), + KeyRangeRef( + persistTagMessageRefsKey(logData->logId, reqTag, reqBegin), + persistTagMessageRefsKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1)); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); state std::vector> commitLocations; state bool earlyEnd = false; @@ -1796,7 +1793,7 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, earlyEnd = true; break; } - if (sd.version >= begin) { + if (sd.version >= reqBegin) { firstVersion = std::min(firstVersion, sd.version); const IDiskQueue::location end = sd.start.lo + sd.length; commitLocations.emplace_back(sd.start, end); @@ -1838,13 +1835,13 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, messages << VERSION_HEADER << entry.version; std::vector rawMessages = - wait(parseMessagesForTag(entry.messages, tag, logData->logRouterTags)); + wait(parseMessagesForTag(entry.messages, reqTag, logData->logRouterTags)); for (const StringRef& msg : rawMessages) { messages.serializeBytes(msg); DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg) .detail("UID", self->dbgid) .detail("LogId", logData->logId) - .detail("PeekTag", tag); + .detail("PeekTag", reqTag); } lastRefMessageVersion = entry.version; @@ -1865,10 +1862,10 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, if (reqOnlySpilled) { endVersion = logData->persistentDataDurableVersion + 1; } else { - peekMessagesFromMemory(logData, tag, begin, messages, endVersion); + peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion); } - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); } TLogPeekReply reply; @@ -1878,12 +1875,12 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, reply.end = endVersion; reply.onlySpilled = onlySpilled; - //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("Tag", req.tag.toString()). - // detail("BeginVer", req.begin).detail("EndVer", reply.end). + //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("Tag", reqTag.toString()). + // detail("BeginVer", reqBegin).detail("EndVer", reply.end). // detail("MsgBytes", reply.messages.expectedSize()). - // detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()); + // detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()); - if (sequence.present()) { + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; trackerData.lastUpdate = now(); @@ -1905,11 +1902,11 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, trackerData.blockTime += blockT; trackerData.workTime += workT; - auto& sequenceData = trackerData.sequence_version[sequenceNum + 1]; - if (trackerData.sequence_version.size() && sequenceNum + 1 < trackerData.sequence_version.begin()->first) { + auto& sequenceData = trackerData.sequence_version[sequence + 1]; + if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { replyPromise.sendError(operation_obsolete()); if (!sequenceData.isSet()) { - // It would technically be more correct to .send({req.begin, req.onlySpilled}), as the next + // It would technically be more correct to .send({reqBegin, reqOnlySpilled}), as the next // request might still be in the window of active requests, but LogSystemPeekCursor will // throw away all future responses upon getting an operation_obsolete(), so computing a // response will probably be a waste of CPU. @@ -1920,21 +1917,20 @@ Future peekTLogAndSend(PromiseType replyPromise, TLogData* self, if (sequenceData.isSet()) { trackerData.duplicatePeeks++; if (sequenceData.getFuture().get().first != reply.end) { - TEST(true); // xz tlog peek second attempt ended at a different version (2) + TEST(true); // tlog peek second attempt ended at a different version (2) replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(std::make_pair(reply.end, reply.onlySpilled)); } - reply.begin = begin; + reply.begin = reqBegin; } replyPromise.send(reply); return Void(); } - // This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference logData) { self->activePeekStreams++; @@ -1947,9 +1943,10 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref state Promise promise; state Future future(promise.getFuture()); try { - wait(req.reply.onReady() && peekTLogAndSend(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled)); + wait(req.reply.onReady() && + tLogPeekMessages(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled)); ASSERT(future.isReady()); - if(future.isError()) { + if (future.isError()) { throw future.getError(); } @@ -1976,343 +1973,6 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref } } -ACTOR Future tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference logData) { - state BinaryWriter messages(Unversioned()); - state BinaryWriter messages2(Unversioned()); - state int sequence = -1; - state UID peekId; - state double queueStart = now(); - - if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) { - req.tag.id = req.tag.id % logData->txsTags; - } - - if (req.sequence.present()) { - try { - peekId = req.sequence.get().first; - sequence = req.sequence.get().second; - if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && - logData->peekTracker.find(peekId) == logData->peekTracker.end()) { - throw operation_obsolete(); - } - auto& trackerData = logData->peekTracker[peekId]; - if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) { - trackerData.tag = req.tag; - trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled)); - } - auto seqBegin = trackerData.sequence_version.begin(); - // The peek cursor and this comparison need to agree about the maximum number of in-flight requests. - while (trackerData.sequence_version.size() && - seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) { - if (seqBegin->second.canBeSet()) { - seqBegin->second.sendError(operation_obsolete()); - } - trackerData.sequence_version.erase(seqBegin); - seqBegin = trackerData.sequence_version.begin(); - } - - if (trackerData.sequence_version.size() && sequence < seqBegin->first) { - throw operation_obsolete(); - } - - Future> fPrevPeekData = trackerData.sequence_version[sequence].getFuture(); - if (fPrevPeekData.isReady()) { - trackerData.unblockedPeeks++; - double t = now() - trackerData.lastUpdate; - if (t > trackerData.idleMax) - trackerData.idleMax = t; - trackerData.idleTime += t; - } - trackerData.lastUpdate = now(); - std::pair prevPeekData = wait(fPrevPeekData); - req.begin = std::max(prevPeekData.first, req.begin); - req.onlySpilled = prevPeekData.second; - wait(yield()); - } catch (Error& e) { - if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) { - req.reply.sendError(e); - return Void(); - } else { - throw; - } - } - } - - state double blockStart = now(); - - if (req.returnIfBlocked && logData->version.get() < req.begin) { - req.reply.sendError(end_of_stream()); - if (req.sequence.present()) { - auto& trackerData = logData->peekTracker[peekId]; - auto& sequenceData = trackerData.sequence_version[sequence + 1]; - trackerData.lastUpdate = now(); - if (!sequenceData.isSet()) { - sequenceData.send(std::make_pair(req.begin, req.onlySpilled)); - } - } - return Void(); - } - - //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); - // Wait until we have something to return that the caller doesn't already have - if (logData->version.get() < req.begin) { - wait(logData->version.whenAtLeast(req.begin)); - wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); - } - - if (logData->locality != tagLocalitySatellite && req.tag.locality == tagLocalityLogRouter) { - wait(self->concurrentLogRouterReads.take()); - state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads); - wait(delay(0.0, TaskPriority::Low)); - } - - if (req.begin <= logData->persistentDataDurableVersion && req.tag.locality != tagLocalityTxs && req.tag != txsTag) { - // Reading spilled data will almost always imply that the storage server is >5s behind the rest - // of the cluster. We shouldn't prioritize spending CPU on helping this server catch up - // slightly faster over keeping the rest of the cluster operating normally. - // txsTag is only ever peeked on recovery, and we would still wish to prioritize requests - // that impact recovery duration. - wait(delay(0, TaskPriority::TLogSpilledPeekReply)); - } - - state double workStart = now(); - - Version poppedVer = poppedVersion(logData, req.tag); - if (poppedVer > req.begin) { - TLogPeekReply rep; - rep.maxKnownVersion = logData->version.get(); - rep.minKnownCommittedVersion = logData->minKnownCommittedVersion; - rep.popped = poppedVer; - rep.end = poppedVer; - rep.onlySpilled = false; - - if (req.sequence.present()) { - auto& trackerData = logData->peekTracker[peekId]; - auto& sequenceData = trackerData.sequence_version[sequence + 1]; - trackerData.lastUpdate = now(); - if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { - req.reply.sendError(operation_obsolete()); - if (!sequenceData.isSet()) - sequenceData.sendError(operation_obsolete()); - return Void(); - } - if (sequenceData.isSet()) { - if (sequenceData.getFuture().get().first != rep.end) { - TEST(true); // tlog peek second attempt ended at a different version - req.reply.sendError(operation_obsolete()); - return Void(); - } - } else { - sequenceData.send(std::make_pair(rep.end, rep.onlySpilled)); - } - rep.begin = req.begin; - } - - req.reply.send(rep); - return Void(); - } - - state Version endVersion = logData->version.get() + 1; - state bool onlySpilled = false; - - // grab messages from disk - //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); - if (req.begin <= logData->persistentDataDurableVersion) { - // Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We - // may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if - // an initial attempt to read from disk results in insufficient data and the required data is no longer in - // memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the - // result? - - if (req.onlySpilled) { - endVersion = logData->persistentDataDurableVersion + 1; - } else { - peekMessagesFromMemory(logData, req.tag, req.begin, messages2, endVersion); - } - - if (logData->shouldSpillByValue(req.tag)) { - RangeResult kvs = wait(self->persistentData->readRange( - KeyRangeRef(persistTagMessagesKey(logData->logId, req.tag, req.begin), - persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), - SERVER_KNOBS->DESIRED_TOTAL_BYTES, - SERVER_KNOBS->DESIRED_TOTAL_BYTES)); - - for (auto& kv : kvs) { - auto ver = decodeTagMessagesKey(kv.key); - messages << VERSION_HEADER << ver; - messages.serializeBytes(kv.value); - } - - if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) { - endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1; - onlySpilled = true; - } else { - messages.serializeBytes(messages2.toValue()); - } - } else { - // FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow. - RangeResult kvrefs = wait(self->persistentData->readRange( - KeyRangeRef( - persistTagMessageRefsKey(logData->logId, req.tag, req.begin), - persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), - SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1)); - - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); - - state std::vector> commitLocations; - state bool earlyEnd = false; - uint32_t mutationBytes = 0; - state uint64_t commitBytes = 0; - state Version firstVersion = std::numeric_limits::max(); - for (int i = 0; i < kvrefs.size() && i < SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK; i++) { - auto& kv = kvrefs[i]; - VectorRef spilledData; - BinaryReader r(kv.value, AssumeVersion(logData->protocolVersion)); - r >> spilledData; - for (const SpilledData& sd : spilledData) { - if (mutationBytes >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) { - earlyEnd = true; - break; - } - if (sd.version >= req.begin) { - firstVersion = std::min(firstVersion, sd.version); - const IDiskQueue::location end = sd.start.lo + sd.length; - commitLocations.emplace_back(sd.start, end); - // This isn't perfect, because we aren't accounting for page boundaries, but should be - // close enough. - commitBytes += sd.length; - mutationBytes += sd.mutationBytes; - } - } - if (earlyEnd) - break; - } - earlyEnd = earlyEnd || (kvrefs.size() >= SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1); - wait(self->peekMemoryLimiter.take(TaskPriority::TLogSpilledPeekReply, commitBytes)); - state FlowLock::Releaser memoryReservation(self->peekMemoryLimiter, commitBytes); - state std::vector>> messageReads; - messageReads.reserve(commitLocations.size()); - for (const auto& pair : commitLocations) { - messageReads.push_back(self->rawPersistentQueue->read(pair.first, pair.second, CheckHashes::True)); - } - commitLocations.clear(); - wait(waitForAll(messageReads)); - - state Version lastRefMessageVersion = 0; - state int index = 0; - loop { - if (index >= messageReads.size()) - break; - Standalone queueEntryData = messageReads[index].get(); - uint8_t valid; - const uint32_t length = *(uint32_t*)queueEntryData.begin(); - queueEntryData = queueEntryData.substr(4, queueEntryData.size() - 4); - BinaryReader rd(queueEntryData, IncludeVersion()); - state TLogQueueEntry entry; - rd >> entry >> valid; - ASSERT(valid == 0x01); - ASSERT(length + sizeof(valid) == queueEntryData.size()); - - messages << VERSION_HEADER << entry.version; - - std::vector rawMessages = - wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags)); - for (const StringRef& msg : rawMessages) { - messages.serializeBytes(msg); - DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg) - .detail("UID", self->dbgid) - .detail("LogId", logData->logId) - .detail("PeekTag", req.tag); - } - - lastRefMessageVersion = entry.version; - index++; - } - - messageReads.clear(); - memoryReservation.release(); - - if (earlyEnd) { - endVersion = lastRefMessageVersion + 1; - onlySpilled = true; - } else { - messages.serializeBytes(messages2.toValue()); - } - } - } else { - if (req.onlySpilled) { - endVersion = logData->persistentDataDurableVersion + 1; - } else { - peekMessagesFromMemory(logData, req.tag, req.begin, messages, endVersion); - } - - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); - } - - TLogPeekReply reply; - reply.maxKnownVersion = logData->version.get(); - reply.minKnownCommittedVersion = logData->minKnownCommittedVersion; - reply.messages = messages.toValue(); - reply.end = endVersion; - reply.onlySpilled = onlySpilled; - - //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("Tag", req.tag.toString()). - // detail("BeginVer", req.begin).detail("EndVer", reply.end). - // detail("MsgBytes", reply.messages.expectedSize()). - // detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()); - - if (req.sequence.present()) { - auto& trackerData = logData->peekTracker[peekId]; - trackerData.lastUpdate = now(); - - double queueT = blockStart - queueStart; - double blockT = workStart - blockStart; - double workT = now() - workStart; - - trackerData.totalPeeks++; - trackerData.replyBytes += reply.messages.size(); - - if (queueT > trackerData.queueMax) - trackerData.queueMax = queueT; - if (blockT > trackerData.blockMax) - trackerData.blockMax = blockT; - if (workT > trackerData.workMax) - trackerData.workMax = workT; - - trackerData.queueTime += queueT; - trackerData.blockTime += blockT; - trackerData.workTime += workT; - - auto& sequenceData = trackerData.sequence_version[sequence + 1]; - if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { - req.reply.sendError(operation_obsolete()); - if (!sequenceData.isSet()) { - // It would technically be more correct to .send({req.begin, req.onlySpilled}), as the next - // request might still be in the window of active requests, but LogSystemPeekCursor will - // throw away all future responses upon getting an operation_obsolete(), so computing a - // response will probably be a waste of CPU. - sequenceData.sendError(operation_obsolete()); - } - return Void(); - } - if (sequenceData.isSet()) { - trackerData.duplicatePeeks++; - if (sequenceData.getFuture().get().first != reply.end) { - TEST(true); // tlog peek second attempt ended at a different version (2) - req.reply.sendError(operation_obsolete()); - return Void(); - } - } else { - sequenceData.send(std::make_pair(reply.end, reply.onlySpilled)); - } - reply.begin = req.begin; - } - - req.reply.send(reply); - return Void(); -} - - ACTOR Future doQueueCommit(TLogData* self, Reference logData, std::vector> missingFinalCommit) { @@ -2814,7 +2474,8 @@ ACTOR Future serveTLogInterface(TLogData* self, logData->addActor.send(tLogPeekStream(self, req, logData)); } when(TLogPeekRequest req = waitNext(tli.peekMessages.getFuture())) { - logData->addActor.send(tLogPeekMessages(self, req, logData)); + logData->addActor.send(tLogPeekMessages( + req.reply, self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence)); } when(TLogPopRequest req = waitNext(tli.popMessages.getFuture())) { logData->addActor.send(tLogPop(self, req, logData)); diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index 25c6e32b80..7a8f1e24f7 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -184,7 +184,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) { init( FAST_NETWORK_LATENCY, 800e-6 ); init( SLOW_NETWORK_LATENCY, 100e-3 ); init( MAX_CLOGGING_LATENCY, 0 ); if( randomize && BUGGIFY ) MAX_CLOGGING_LATENCY = 0.1 * deterministicRandom()->random01(); - init( MAX_BUGGIFIED_DELAY, 0 ); // if( randomize && BUGGIFY ) MAX_BUGGIFIED_DELAY = 0.2 * deterministicRandom()->random01(); + init( MAX_BUGGIFIED_DELAY, 0 ); if( randomize && BUGGIFY ) MAX_BUGGIFIED_DELAY = 0.2 * deterministicRandom()->random01(); init( SIM_CONNECT_ERROR_MODE, deterministicRandom()->randomInt(0,3) ); //Tracefiles