diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 3a6d691d04..2211a4f865 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -547,7 +547,10 @@ struct LogData : NonCopyable, public ReferenceCounted { Counter bytesDurable; Counter blockingPeeks; Counter blockingPeekTimeouts; + Counter emptyPeeks; + Counter nonEmptyPeeks; std::map blockingPeekLatencies; + std::map peekVersionCounts; // NOTE: Doesn't capture versions peeked from disk UID logId; ProtocolVersion protocolVersion; @@ -630,7 +633,8 @@ struct LogData : NonCopyable, public ReferenceCounted { durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), minPoppedTagVersion(0), minPoppedTag(invalidTag), unpoppedRecoveredTags(0), cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), blockingPeeks("BlockingPeeks", cc), - blockingPeekTimeouts("BlockingPeekTimeouts", cc), logId(interf.id()), protocolVersion(protocolVersion), + blockingPeekTimeouts("BlockingPeekTimeouts", cc), emptyPeeks("EmptyPeeks", cc), + nonEmptyPeeks("NonEmptyPeeks", cc), logId(interf.id()), protocolVersion(protocolVersion), newPersistentDataVersion(invalidVersion), tLogData(tLogData), unrecoveredBefore(1), recoveredAt(1), logSystem(new AsyncVar>()), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), logRouterPoppedVersion(0), logRouterPopToVersion(0), locality(tagLocalityInvalid), @@ -1575,6 +1579,7 @@ void peekMessagesFromMemory(Reference self, std::make_pair(begin, LengthPrefixedStringRef()), [](const auto& l, const auto& r) -> bool { return l.first < r.first; }); + int versionCount = 0; Version currentVersion = -1; for (; it != deque.end(); ++it) { if (it->first != currentVersion) { @@ -1596,6 +1601,24 @@ void peekMessagesFromMemory(Reference self, DEBUG_TAGS_AND_MESSAGE( "TLogPeek", currentVersion, StringRef((uint8_t*)data + offset, messages.getLength() - offset), self->logId) .detail("PeekTag", tag); + versionCount++; + } + + // Update counters. + if (!versionCount) { + ++self->emptyPeeks; + } else { + ++self->nonEmptyPeeks; + + // TODO (version vector) check if this should be included in "status details" json + if (self->peekVersionCounts.find(tag) == self->peekVersionCounts.end()) { + UID ssID = nondeterministicRandom()->randomUniqueID(); + std::string s = "PeekVersionCounts " + tag.toString(); + self->peekVersionCounts.try_emplace( + tag, s, ssID, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE); + } + LatencySample& sample = self->peekVersionCounts.at(tag); + sample.addMeasurement(versionCount); } } diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 15790cd8b2..79f43e66d6 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -256,6 +256,9 @@ struct MasterData : NonCopyable, ReferenceCounted { Counter backupWorkerDoneRequests; Counter getLiveCommittedVersionRequests; Counter reportLiveCommittedVersionRequests; + // This counter gives an estimate of the number of non-empty peeks that storage servers + // should do from tlogs (in the worst case, ignoring blocking peek timeouts). + Counter versionVectorTagUpdates; LatencySample versionVectorSizeOnCVReply; Future logger; @@ -287,6 +290,7 @@ struct MasterData : NonCopyable, ReferenceCounted { backupWorkerDoneRequests("BackupWorkerDoneRequests", cc), getLiveCommittedVersionRequests("GetLiveCommittedVersionRequests", cc), reportLiveCommittedVersionRequests("ReportLiveCommittedVersionRequests", cc), + versionVectorTagUpdates("VersionVectorTagUpdates", cc), versionVectorSizeOnCVReply("VersionVectorSizeOnCVReply", dbgid, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, @@ -1250,10 +1254,15 @@ ACTOR Future provideVersions(Reference self) { void updateLiveCommittedVersion(Reference self, ReportRawCommittedVersionRequest req) { self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, req.minKnownCommittedVersion); + if (req.version > self->liveCommittedVersion.get()) { if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.writtenTags.present()) { // TraceEvent("Received ReportRawCommittedVersionRequest").detail("Version",req.version); self->ssVersionVector.setVersion(req.writtenTags.get(), req.version); + + if (req.writtenTags.present()) { + self->versionVectorTagUpdates += req.writtenTags.get().size(); + } } self->databaseLocked = req.locked; self->proxyMetadataVersion = req.metadataVersion;