added additional logging on the logs and log routers
This commit is contained in:
parent
d54ba497f0
commit
8590b710bf
|
@ -63,7 +63,8 @@
|
|||
"resolver",
|
||||
"cluster_controller",
|
||||
"data_distributor",
|
||||
"ratekeeper"
|
||||
"ratekeeper",
|
||||
"router"
|
||||
]
|
||||
},
|
||||
"data_version":12341234,
|
||||
|
|
|
@ -84,7 +84,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
|||
"resolver",
|
||||
"cluster_controller",
|
||||
"data_distributor",
|
||||
"ratekeeper"
|
||||
"ratekeeper",
|
||||
"router"
|
||||
]
|
||||
},
|
||||
"data_version":12341234,
|
||||
|
|
|
@ -33,6 +33,21 @@
|
|||
#include "flow/Stats.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
struct LogRouterStats {
|
||||
CounterCollection cc;
|
||||
Future<Void> logger;
|
||||
|
||||
explicit LogRouterStats(UID id, NotifiedVersion* pVersion, NotifiedVersion* pMinPopped, Version* pMinKnownCommittedVersion, Version* pPoppedVersion)
|
||||
: cc("LogRouterStats", id.toString())
|
||||
{
|
||||
specialCounter(cc, "Version", [pVersion](){return pVersion->get(); });
|
||||
specialCounter(cc, "MinPopped", [pMinPopped](){return pMinPopped->get(); });
|
||||
specialCounter(cc, "MinKnownCommittedVersion", [pMinKnownCommittedVersion](){ return *pMinKnownCommittedVersion; });
|
||||
specialCounter(cc, "PoppedVersion", [pPoppedVersion](){ return *pPoppedVersion; });
|
||||
logger = traceCounters("LogRouterMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "LogRouterMetrics");
|
||||
}
|
||||
};
|
||||
|
||||
struct LogRouterData {
|
||||
struct TagData : NonCopyable, public ReferenceCounted<TagData> {
|
||||
std::deque<std::pair<Version, LengthPrefixedStringRef>> version_messages;
|
||||
|
@ -75,6 +90,7 @@ struct LogRouterData {
|
|||
};
|
||||
|
||||
UID dbgid;
|
||||
LogRouterStats stats;
|
||||
Reference<AsyncVar<Reference<ILogSystem>>> logSystem;
|
||||
NotifiedVersion version;
|
||||
NotifiedVersion minPopped;
|
||||
|
@ -104,7 +120,9 @@ struct LogRouterData {
|
|||
return newTagData;
|
||||
}
|
||||
|
||||
LogRouterData(UID dbgid, InitializeLogRouterRequest req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar<Reference<ILogSystem>>()), version(req.startVersion-1), minPopped(0), startVersion(req.startVersion), allowPops(false), minKnownCommittedVersion(0), poppedVersion(0), foundEpochEnd(false) {
|
||||
LogRouterData(UID dbgid, InitializeLogRouterRequest req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar<Reference<ILogSystem>>()),
|
||||
version(req.startVersion-1), minPopped(0), startVersion(req.startVersion), allowPops(false), minKnownCommittedVersion(0), poppedVersion(0), foundEpochEnd(false),
|
||||
stats(dbgid, &version, &minPopped, &minKnownCommittedVersion, &poppedVersion) {
|
||||
//setup just enough of a logSet to be able to call getPushLocations
|
||||
logSet.logServers.resize(req.tLogLocalities.size());
|
||||
logSet.tLogPolicy = req.tLogPolicy;
|
||||
|
|
|
@ -433,6 +433,10 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
queueCommittedVersion.initMetric(LiteralStringRef("TLog.QueueCommittedVersion"), cc.id);
|
||||
|
||||
specialCounter(cc, "Version", [this](){ return this->version.get(); });
|
||||
specialCounter(cc, "QueueCommittedVersion", [this](){ return this->queueCommittedVersion.get(); });
|
||||
specialCounter(cc, "PersistentDataVersion", [this](){ return this->persistentDataVersion; });
|
||||
specialCounter(cc, "PersistentDataDurableVersion", [this](){ return this->persistentDataDurableVersion; });
|
||||
specialCounter(cc, "KnownCommittedVersion", [this](){ return this->knownCommittedVersion; });
|
||||
specialCounter(cc, "SharedBytesInput", [tLogData](){ return tLogData->bytesInput; });
|
||||
specialCounter(cc, "SharedBytesDurable", [tLogData](){ return tLogData->bytesDurable; });
|
||||
specialCounter(cc, "SharedOverheadBytesInput", [tLogData](){ return tLogData->overheadBytesInput; });
|
||||
|
|
|
@ -608,6 +608,24 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
|
|||
roles.addRole("ratekeeper", db->get().ratekeeper.get());
|
||||
}
|
||||
|
||||
for(auto& tLogSet : db->get().logSystemConfig.tLogs) {
|
||||
for(auto& it : tLogSet.logRouters) {
|
||||
if(it.present()) {
|
||||
roles.addRole("router", it.interf());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for(auto& old : db->get().logSystemConfig.oldTLogs) {
|
||||
for(auto& tLogSet : old.tLogs) {
|
||||
for(auto& it : tLogSet.logRouters) {
|
||||
if(it.present()) {
|
||||
roles.addRole("router", it.interf());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state std::vector<std::pair<MasterProxyInterface, EventMap>>::iterator proxy;
|
||||
for(proxy = proxies.begin(); proxy != proxies.end(); ++proxy) {
|
||||
roles.addRole( "proxy", proxy->first, proxy->second );
|
||||
|
|
|
@ -494,6 +494,10 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
queueCommittedVersion.initMetric(LiteralStringRef("TLog.QueueCommittedVersion"), cc.id);
|
||||
|
||||
specialCounter(cc, "Version", [this](){ return this->version.get(); });
|
||||
specialCounter(cc, "QueueCommittedVersion", [this](){ return this->queueCommittedVersion.get(); });
|
||||
specialCounter(cc, "PersistentDataVersion", [this](){ return this->persistentDataVersion; });
|
||||
specialCounter(cc, "PersistentDataDurableVersion", [this](){ return this->persistentDataDurableVersion; });
|
||||
specialCounter(cc, "KnownCommittedVersion", [this](){ return this->knownCommittedVersion; });
|
||||
specialCounter(cc, "SharedBytesInput", [tLogData](){ return tLogData->bytesInput; });
|
||||
specialCounter(cc, "SharedBytesDurable", [tLogData](){ return tLogData->bytesDurable; });
|
||||
specialCounter(cc, "SharedOverheadBytesInput", [tLogData](){ return tLogData->overheadBytesInput; });
|
||||
|
|
Loading…
Reference in New Issue