Add MasterMetrics periodic logging
This commit is contained in:
parent
2f61bf3c42
commit
18120d6b1a
|
@ -245,6 +245,15 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
||||||
|
|
||||||
std::vector<WorkerInterface> backupWorkers; // Recruited backup workers from cluster controller.
|
std::vector<WorkerInterface> backupWorkers; // Recruited backup workers from cluster controller.
|
||||||
|
|
||||||
|
CounterCollection cc;
|
||||||
|
Counter changeCoordinatorsRequests;
|
||||||
|
Counter getCommitVersionRequests;
|
||||||
|
Counter backupWorkerDoneRequests;
|
||||||
|
Counter getLiveCommittedVersionRequests;
|
||||||
|
Counter reportLiveCommittedVersionRequests;
|
||||||
|
|
||||||
|
Future<Void> logger;
|
||||||
|
|
||||||
MasterData(Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
|
MasterData(Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
|
||||||
MasterInterface const& myInterface,
|
MasterInterface const& myInterface,
|
||||||
ServerCoordinators const& coordinators,
|
ServerCoordinators const& coordinators,
|
||||||
|
@ -258,7 +267,13 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
||||||
lastEpochEnd(invalidVersion), liveCommittedVersion(invalidVersion), databaseLocked(false),
|
lastEpochEnd(invalidVersion), liveCommittedVersion(invalidVersion), databaseLocked(false),
|
||||||
minKnownCommittedVersion(invalidVersion), recoveryTransactionVersion(invalidVersion), lastCommitTime(0),
|
minKnownCommittedVersion(invalidVersion), recoveryTransactionVersion(invalidVersion), lastCommitTime(0),
|
||||||
registrationCount(0), version(invalidVersion), lastVersionTime(0), txnStateStore(0), memoryLimit(2e9),
|
registrationCount(0), version(invalidVersion), lastVersionTime(0), txnStateStore(0), memoryLimit(2e9),
|
||||||
addActor(addActor), hasConfiguration(false), recruitmentStalled(makeReference<AsyncVar<bool>>(false)) {
|
addActor(addActor), hasConfiguration(false), recruitmentStalled(makeReference<AsyncVar<bool>>(false)),
|
||||||
|
cc("Master", dbgid.toString()), changeCoordinatorsRequests("ChangeCoordinatorsRequests", cc),
|
||||||
|
getCommitVersionRequests("GetCommitVersionRequests", cc),
|
||||||
|
backupWorkerDoneRequests("BackupWorkerDoneRequests", cc),
|
||||||
|
getLiveCommittedVersionRequests("GetLiveCommittedVersionRequests", cc),
|
||||||
|
reportLiveCommittedVersionRequests("ReportLiveCommittedVersionRequests", cc) {
|
||||||
|
logger = traceCounters("MasterMetrics", dbgid, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "MasterMetrics");
|
||||||
if (forceRecovery && !myInterface.locality.dcId().present()) {
|
if (forceRecovery && !myInterface.locality.dcId().present()) {
|
||||||
TraceEvent(SevError, "ForcedRecoveryRequiresDcID");
|
TraceEvent(SevError, "ForcedRecoveryRequiresDcID");
|
||||||
forceRecovery = false;
|
forceRecovery = false;
|
||||||
|
@ -1095,6 +1110,8 @@ ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionReques
|
||||||
state std::map<UID, CommitProxyVersionReplies>::iterator proxyItr =
|
state std::map<UID, CommitProxyVersionReplies>::iterator proxyItr =
|
||||||
self->lastCommitProxyVersionReplies.find(req.requestingProxy); // lastCommitProxyVersionReplies never changes
|
self->lastCommitProxyVersionReplies.find(req.requestingProxy); // lastCommitProxyVersionReplies never changes
|
||||||
|
|
||||||
|
++self->getCommitVersionRequests;
|
||||||
|
|
||||||
if (proxyItr == self->lastCommitProxyVersionReplies.end()) {
|
if (proxyItr == self->lastCommitProxyVersionReplies.end()) {
|
||||||
// Request from invalid proxy (e.g. from duplicate recruitment request)
|
// Request from invalid proxy (e.g. from duplicate recruitment request)
|
||||||
req.reply.send(Never());
|
req.reply.send(Never());
|
||||||
|
@ -1191,6 +1208,7 @@ ACTOR Future<Void> serveLiveCommittedVersion(Reference<MasterData> self) {
|
||||||
if (self->liveCommittedVersion == invalidVersion) {
|
if (self->liveCommittedVersion == invalidVersion) {
|
||||||
self->liveCommittedVersion = self->recoveryTransactionVersion;
|
self->liveCommittedVersion = self->recoveryTransactionVersion;
|
||||||
}
|
}
|
||||||
|
++self->getLiveCommittedVersionRequests;
|
||||||
GetRawCommittedVersionReply reply;
|
GetRawCommittedVersionReply reply;
|
||||||
reply.version = self->liveCommittedVersion;
|
reply.version = self->liveCommittedVersion;
|
||||||
reply.locked = self->databaseLocked;
|
reply.locked = self->databaseLocked;
|
||||||
|
@ -1206,6 +1224,7 @@ ACTOR Future<Void> serveLiveCommittedVersion(Reference<MasterData> self) {
|
||||||
self->databaseLocked = req.locked;
|
self->databaseLocked = req.locked;
|
||||||
self->proxyMetadataVersion = req.metadataVersion;
|
self->proxyMetadataVersion = req.metadataVersion;
|
||||||
}
|
}
|
||||||
|
++self->reportLiveCommittedVersionRequests;
|
||||||
req.reply.send(Void());
|
req.reply.send(Void());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1374,6 +1393,7 @@ static std::set<int> const& normalMasterErrors() {
|
||||||
ACTOR Future<Void> changeCoordinators(Reference<MasterData> self) {
|
ACTOR Future<Void> changeCoordinators(Reference<MasterData> self) {
|
||||||
loop {
|
loop {
|
||||||
ChangeCoordinatorsRequest req = waitNext(self->myInterface.changeCoordinators.getFuture());
|
ChangeCoordinatorsRequest req = waitNext(self->myInterface.changeCoordinators.getFuture());
|
||||||
|
++self->changeCoordinatorsRequests;
|
||||||
state ChangeCoordinatorsRequest changeCoordinatorsRequest = req;
|
state ChangeCoordinatorsRequest changeCoordinatorsRequest = req;
|
||||||
|
|
||||||
while (!self->cstate.previousWrite.isReady()) {
|
while (!self->cstate.previousWrite.isReady()) {
|
||||||
|
@ -1981,6 +2001,7 @@ ACTOR Future<Void> masterServer(MasterInterface mi,
|
||||||
if (self->logSystem.isValid() && self->logSystem->removeBackupWorker(req)) {
|
if (self->logSystem.isValid() && self->logSystem->removeBackupWorker(req)) {
|
||||||
self->registrationTrigger.trigger();
|
self->registrationTrigger.trigger();
|
||||||
}
|
}
|
||||||
|
++self->backupWorkerDoneRequests;
|
||||||
req.reply.send(Void());
|
req.reply.send(Void());
|
||||||
}
|
}
|
||||||
when(wait(collection)) {
|
when(wait(collection)) {
|
||||||
|
|
Loading…
Reference in New Issue