- Capture metrics about empty/non-empty peeks done by storage servers

This commit is contained in:
Sreenath Bodagala 2021-10-26 14:37:46 +00:00
parent e69fcaf8d6
commit 4503b0a347
2 changed files with 33 additions and 1 deletions

View File

@ -547,7 +547,10 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
Counter bytesDurable;
Counter blockingPeeks;
Counter blockingPeekTimeouts;
Counter emptyPeeks;
Counter nonEmptyPeeks;
std::map<Tag, LatencySample> blockingPeekLatencies;
std::map<Tag, LatencySample> peekVersionCounts; // NOTE: Doesn't capture versions peeked from disk
UID logId;
ProtocolVersion protocolVersion;
@ -630,7 +633,8 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), minPoppedTagVersion(0),
minPoppedTag(invalidTag), unpoppedRecoveredTags(0), cc("TLog", interf.id().toString()),
bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), blockingPeeks("BlockingPeeks", cc),
blockingPeekTimeouts("BlockingPeekTimeouts", cc), logId(interf.id()), protocolVersion(protocolVersion),
blockingPeekTimeouts("BlockingPeekTimeouts", cc), emptyPeeks("EmptyPeeks", cc),
nonEmptyPeeks("NonEmptyPeeks", cc), logId(interf.id()), protocolVersion(protocolVersion),
newPersistentDataVersion(invalidVersion), tLogData(tLogData), unrecoveredBefore(1), recoveredAt(1),
logSystem(new AsyncVar<Reference<ILogSystem>>()), remoteTag(remoteTag), isPrimary(isPrimary),
logRouterTags(logRouterTags), logRouterPoppedVersion(0), logRouterPopToVersion(0), locality(tagLocalityInvalid),
@ -1575,6 +1579,7 @@ void peekMessagesFromMemory(Reference<LogData> self,
std::make_pair(begin, LengthPrefixedStringRef()),
[](const auto& l, const auto& r) -> bool { return l.first < r.first; });
int versionCount = 0;
Version currentVersion = -1;
for (; it != deque.end(); ++it) {
if (it->first != currentVersion) {
@ -1596,6 +1601,24 @@ void peekMessagesFromMemory(Reference<LogData> self,
DEBUG_TAGS_AND_MESSAGE(
"TLogPeek", currentVersion, StringRef((uint8_t*)data + offset, messages.getLength() - offset), self->logId)
.detail("PeekTag", tag);
versionCount++;
}
// Update counters.
if (!versionCount) {
++self->emptyPeeks;
} else {
++self->nonEmptyPeeks;
// TODO (version vector) check if this should be included in "status details" json
if (self->peekVersionCounts.find(tag) == self->peekVersionCounts.end()) {
UID ssID = nondeterministicRandom()->randomUniqueID();
std::string s = "PeekVersionCounts " + tag.toString();
self->peekVersionCounts.try_emplace(
tag, s, ssID, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE);
}
LatencySample& sample = self->peekVersionCounts.at(tag);
sample.addMeasurement(versionCount);
}
}

View File

@ -256,6 +256,9 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
Counter backupWorkerDoneRequests;
Counter getLiveCommittedVersionRequests;
Counter reportLiveCommittedVersionRequests;
// This counter gives an estimate of the number of non-empty peeks that storage servers
// should do from tlogs (in the worst case, ignoring blocking peek timeouts).
Counter versionVectorTagUpdates;
LatencySample versionVectorSizeOnCVReply;
Future<Void> logger;
@ -287,6 +290,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
backupWorkerDoneRequests("BackupWorkerDoneRequests", cc),
getLiveCommittedVersionRequests("GetLiveCommittedVersionRequests", cc),
reportLiveCommittedVersionRequests("ReportLiveCommittedVersionRequests", cc),
versionVectorTagUpdates("VersionVectorTagUpdates", cc),
versionVectorSizeOnCVReply("VersionVectorSizeOnCVReply",
dbgid,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
@ -1250,10 +1254,15 @@ ACTOR Future<Void> provideVersions(Reference<MasterData> self) {
void updateLiveCommittedVersion(Reference<MasterData> self, ReportRawCommittedVersionRequest req) {
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, req.minKnownCommittedVersion);
if (req.version > self->liveCommittedVersion.get()) {
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.writtenTags.present()) {
// TraceEvent("Received ReportRawCommittedVersionRequest").detail("Version",req.version);
self->ssVersionVector.setVersion(req.writtenTags.get(), req.version);
if (req.writtenTags.present()) {
self->versionVectorTagUpdates += req.writtenTags.get().size();
}
}
self->databaseLocked = req.locked;
self->proxyMetadataVersion = req.metadataVersion;