diff --git a/documentation/sphinx/source/mr-status-json-schemas.rst.inc b/documentation/sphinx/source/mr-status-json-schemas.rst.inc index 3608b26dd0..99ac850b27 100644 --- a/documentation/sphinx/source/mr-status-json-schemas.rst.inc +++ b/documentation/sphinx/source/mr-status-json-schemas.rst.inc @@ -63,7 +63,8 @@ "resolver", "cluster_controller", "data_distributor", - "ratekeeper" + "ratekeeper", + "router" ] }, "data_version":12341234, diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index 02c31634a3..f8303e40bf 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -84,7 +84,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "resolver", "cluster_controller", "data_distributor", - "ratekeeper" + "ratekeeper", + "router" ] }, "data_version":12341234, diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index 0341d31810..12b3cde9cf 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -33,6 +33,21 @@ #include "flow/Stats.h" #include "flow/actorcompiler.h" // This must be the last #include. +struct LogRouterStats { + CounterCollection cc; + Future logger; + + explicit LogRouterStats(UID id, NotifiedVersion* pVersion, NotifiedVersion* pMinPopped, Version* pMinKnownCommittedVersion, Version* pPoppedVersion) + : cc("LogRouterStats", id.toString()) + { + specialCounter(cc, "Version", [pVersion](){return pVersion->get(); }); + specialCounter(cc, "MinPopped", [pMinPopped](){return pMinPopped->get(); }); + specialCounter(cc, "MinKnownCommittedVersion", [pMinKnownCommittedVersion](){ return *pMinKnownCommittedVersion; }); + specialCounter(cc, "PoppedVersion", [pPoppedVersion](){ return *pPoppedVersion; }); + logger = traceCounters("LogRouterMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "LogRouterMetrics"); + } +}; + struct LogRouterData { struct TagData : NonCopyable, public ReferenceCounted { std::deque> version_messages; @@ -75,6 +90,7 @@ struct LogRouterData { }; UID dbgid; + LogRouterStats stats; Reference>> logSystem; NotifiedVersion version; NotifiedVersion minPopped; @@ -104,7 +120,9 @@ struct LogRouterData { return newTagData; } - LogRouterData(UID dbgid, InitializeLogRouterRequest req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar>()), version(req.startVersion-1), minPopped(0), startVersion(req.startVersion), allowPops(false), minKnownCommittedVersion(0), poppedVersion(0), foundEpochEnd(false) { + LogRouterData(UID dbgid, InitializeLogRouterRequest req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar>()), + version(req.startVersion-1), minPopped(0), startVersion(req.startVersion), allowPops(false), minKnownCommittedVersion(0), poppedVersion(0), foundEpochEnd(false), + stats(dbgid, &version, &minPopped, &minKnownCommittedVersion, &poppedVersion) { //setup just enough of a logSet to be able to call getPushLocations logSet.logServers.resize(req.tLogLocalities.size()); logSet.tLogPolicy = req.tLogPolicy; diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index 6ac0f68c4a..80bbbf38f5 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -433,6 +433,10 @@ struct LogData : NonCopyable, public ReferenceCounted { queueCommittedVersion.initMetric(LiteralStringRef("TLog.QueueCommittedVersion"), cc.id); specialCounter(cc, "Version", [this](){ return this->version.get(); }); + specialCounter(cc, "QueueCommittedVersion", [this](){ return this->queueCommittedVersion.get(); }); + specialCounter(cc, "PersistentDataVersion", [this](){ return this->persistentDataVersion; }); + specialCounter(cc, "PersistentDataDurableVersion", [this](){ return this->persistentDataDurableVersion; }); + specialCounter(cc, "KnownCommittedVersion", [this](){ return this->knownCommittedVersion; }); specialCounter(cc, "SharedBytesInput", [tLogData](){ return tLogData->bytesInput; }); specialCounter(cc, "SharedBytesDurable", [tLogData](){ return tLogData->bytesDurable; }); specialCounter(cc, "SharedOverheadBytesInput", [tLogData](){ return tLogData->overheadBytesInput; }); diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index d8ab27ff1e..60b760abff 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -608,6 +608,24 @@ ACTOR static Future processStatusFetcher( roles.addRole("ratekeeper", db->get().ratekeeper.get()); } + for(auto& tLogSet : db->get().logSystemConfig.tLogs) { + for(auto& it : tLogSet.logRouters) { + if(it.present()) { + roles.addRole("router", it.interf()); + } + } + } + + for(auto& old : db->get().logSystemConfig.oldTLogs) { + for(auto& tLogSet : old.tLogs) { + for(auto& it : tLogSet.logRouters) { + if(it.present()) { + roles.addRole("router", it.interf()); + } + } + } + } + state std::vector>::iterator proxy; for(proxy = proxies.begin(); proxy != proxies.end(); ++proxy) { roles.addRole( "proxy", proxy->first, proxy->second ); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 81d542a11f..0e1b0b6612 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -494,6 +494,10 @@ struct LogData : NonCopyable, public ReferenceCounted { queueCommittedVersion.initMetric(LiteralStringRef("TLog.QueueCommittedVersion"), cc.id); specialCounter(cc, "Version", [this](){ return this->version.get(); }); + specialCounter(cc, "QueueCommittedVersion", [this](){ return this->queueCommittedVersion.get(); }); + specialCounter(cc, "PersistentDataVersion", [this](){ return this->persistentDataVersion; }); + specialCounter(cc, "PersistentDataDurableVersion", [this](){ return this->persistentDataDurableVersion; }); + specialCounter(cc, "KnownCommittedVersion", [this](){ return this->knownCommittedVersion; }); specialCounter(cc, "SharedBytesInput", [tLogData](){ return tLogData->bytesInput; }); specialCounter(cc, "SharedBytesDurable", [tLogData](){ return tLogData->bytesDurable; }); specialCounter(cc, "SharedOverheadBytesInput", [tLogData](){ return tLogData->overheadBytesInput; });