parent
48990058a3
commit
8427e40cbe
|
@ -99,7 +99,10 @@ ACTOR Future<Void> traceCounters(std::string traceEventName,
|
||||||
for (ICounter* c : counters->counters)
|
for (ICounter* c : counters->counters)
|
||||||
c->resetInterval();
|
c->resetInterval();
|
||||||
|
|
||||||
state Reference<EventCacheHolder> tempEventHolder = makeReference<EventCacheHolder>(trackLatestName);
|
state Reference<EventCacheHolder> traceEventHolder;
|
||||||
|
if (!trackLatestName.empty()) {
|
||||||
|
traceEventHolder = makeReference<EventCacheHolder>(trackLatestName);
|
||||||
|
}
|
||||||
|
|
||||||
state double last_interval = now();
|
state double last_interval = now();
|
||||||
|
|
||||||
|
@ -111,7 +114,7 @@ ACTOR Future<Void> traceCounters(std::string traceEventName,
|
||||||
decorator(te);
|
decorator(te);
|
||||||
|
|
||||||
if (!trackLatestName.empty()) {
|
if (!trackLatestName.empty()) {
|
||||||
te.trackLatest(tempEventHolder->trackingKey);
|
te.trackLatest(traceEventHolder->trackingKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
last_interval = now();
|
last_interval = now();
|
||||||
|
|
|
@ -3071,7 +3071,7 @@ public:
|
||||||
Counter registerMasterRequests;
|
Counter registerMasterRequests;
|
||||||
Counter statusRequests;
|
Counter statusRequests;
|
||||||
|
|
||||||
Reference<EventCacheHolder> clusterControllerEventHolder;
|
Reference<EventCacheHolder> recruitedMasterWorkerEventHolder;
|
||||||
|
|
||||||
ClusterControllerData(ClusterControllerFullInterface const& ccInterface,
|
ClusterControllerData(ClusterControllerFullInterface const& ccInterface,
|
||||||
LocalityData const& locality,
|
LocalityData const& locality,
|
||||||
|
@ -3088,7 +3088,7 @@ public:
|
||||||
getClientWorkersRequests("GetClientWorkersRequests", clusterControllerMetrics),
|
getClientWorkersRequests("GetClientWorkersRequests", clusterControllerMetrics),
|
||||||
registerMasterRequests("RegisterMasterRequests", clusterControllerMetrics),
|
registerMasterRequests("RegisterMasterRequests", clusterControllerMetrics),
|
||||||
statusRequests("StatusRequests", clusterControllerMetrics),
|
statusRequests("StatusRequests", clusterControllerMetrics),
|
||||||
clusterControllerEventHolder(makeReference<EventCacheHolder>("RecruitedMasterWorker")) {
|
recruitedMasterWorkerEventHolder(makeReference<EventCacheHolder>("RecruitedMasterWorker")) {
|
||||||
auto serverInfo = ServerDBInfo();
|
auto serverInfo = ServerDBInfo();
|
||||||
serverInfo.id = deterministicRandom()->randomUniqueID();
|
serverInfo.id = deterministicRandom()->randomUniqueID();
|
||||||
serverInfo.infoGeneration = ++db.dbInfoCount;
|
serverInfo.infoGeneration = ++db.dbInfoCount;
|
||||||
|
|
|
@ -251,11 +251,11 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
||||||
|
|
||||||
Future<Void> logger;
|
Future<Void> logger;
|
||||||
|
|
||||||
Reference<EventCacheHolder> masterRecoveryStateHolder;
|
Reference<EventCacheHolder> masterRecoveryStateEventHolder;
|
||||||
Reference<EventCacheHolder> masterRecoveryGenerationsHolder;
|
Reference<EventCacheHolder> masterRecoveryGenerationsEventHolder;
|
||||||
Reference<EventCacheHolder> masterRecoveryDurationHolder;
|
Reference<EventCacheHolder> masterRecoveryDurationEventHolder;
|
||||||
Reference<EventCacheHolder> masterRecoveryAvailableHolder;
|
Reference<EventCacheHolder> masterRecoveryAvailableEventHolder;
|
||||||
Reference<EventCacheHolder> recoveredConfigHolder;
|
Reference<EventCacheHolder> recoveredConfigEventHolder;
|
||||||
|
|
||||||
MasterData(Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
|
MasterData(Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
|
||||||
MasterInterface const& myInterface,
|
MasterInterface const& myInterface,
|
||||||
|
@ -278,11 +278,11 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
||||||
backupWorkerDoneRequests("BackupWorkerDoneRequests", cc),
|
backupWorkerDoneRequests("BackupWorkerDoneRequests", cc),
|
||||||
getLiveCommittedVersionRequests("GetLiveCommittedVersionRequests", cc),
|
getLiveCommittedVersionRequests("GetLiveCommittedVersionRequests", cc),
|
||||||
reportLiveCommittedVersionRequests("ReportLiveCommittedVersionRequests", cc),
|
reportLiveCommittedVersionRequests("ReportLiveCommittedVersionRequests", cc),
|
||||||
masterRecoveryStateHolder(makeReference<EventCacheHolder>("MasterRecoveryState")),
|
masterRecoveryStateEventHolder(makeReference<EventCacheHolder>("MasterRecoveryState")),
|
||||||
masterRecoveryGenerationsHolder(makeReference<EventCacheHolder>("MasterRecoveryGenerations")),
|
masterRecoveryGenerationsEventHolder(makeReference<EventCacheHolder>("MasterRecoveryGenerations")),
|
||||||
masterRecoveryDurationHolder(makeReference<EventCacheHolder>("MasterRecoveryDuration")),
|
masterRecoveryDurationEventHolder(makeReference<EventCacheHolder>("MasterRecoveryDuration")),
|
||||||
masterRecoveryAvailableHolder(makeReference<EventCacheHolder>("MasterRecoveryAvailable")),
|
masterRecoveryAvailableEventHolder(makeReference<EventCacheHolder>("MasterRecoveryAvailable")),
|
||||||
recoveredConfigHolder(makeReference<EventCacheHolder>("RecoveredConfig")) {
|
recoveredConfigEventHolder(makeReference<EventCacheHolder>("RecoveredConfig")) {
|
||||||
logger = traceCounters("MasterMetrics", dbgid, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "MasterMetrics");
|
logger = traceCounters("MasterMetrics", dbgid, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "MasterMetrics");
|
||||||
if (forceRecovery && !myInterface.locality.dcId().present()) {
|
if (forceRecovery && !myInterface.locality.dcId().present()) {
|
||||||
TraceEvent(SevError, "ForcedRecoveryRequiresDcID").log();
|
TraceEvent(SevError, "ForcedRecoveryRequiresDcID").log();
|
||||||
|
|
|
@ -685,7 +685,13 @@ public:
|
||||||
|
|
||||||
Optional<TagInfo> previousBusiestTag;
|
Optional<TagInfo> previousBusiestTag;
|
||||||
|
|
||||||
Reference<EventCacheHolder> transactionTagCounterHolder;
|
UID thisServerID;
|
||||||
|
|
||||||
|
Reference<EventCacheHolder> busiestReadTagEventHolder;
|
||||||
|
|
||||||
|
TransactionTagCounter(UID thisServerID)
|
||||||
|
: thisServerID(thisServerID), busiestReadTagEventHolder(makeReference<EventCacheHolder>(
|
||||||
|
thisServerID.toString() + "/StorageServerSourceTLogID")) {}
|
||||||
|
|
||||||
int64_t costFunction(int64_t bytes) { return bytes / SERVER_KNOBS->READ_COST_BYTE_FACTOR + 1; }
|
int64_t costFunction(int64_t bytes) { return bytes / SERVER_KNOBS->READ_COST_BYTE_FACTOR + 1; }
|
||||||
|
|
||||||
|
@ -706,7 +712,7 @@ public:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void startNewInterval(UID id) {
|
void startNewInterval() {
|
||||||
double elapsed = now() - intervalStart;
|
double elapsed = now() - intervalStart;
|
||||||
previousBusiestTag.reset();
|
previousBusiestTag.reset();
|
||||||
if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) {
|
if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) {
|
||||||
|
@ -715,13 +721,13 @@ public:
|
||||||
previousBusiestTag = TagInfo(busiestTag, rate, (double)busiestTagCount / intervalTotalSampledCount);
|
previousBusiestTag = TagInfo(busiestTag, rate, (double)busiestTagCount / intervalTotalSampledCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
TraceEvent("BusiestReadTag", id)
|
TraceEvent("BusiestReadTag", thisServerID)
|
||||||
.detail("Elapsed", elapsed)
|
.detail("Elapsed", elapsed)
|
||||||
.detail("Tag", printable(busiestTag))
|
.detail("Tag", printable(busiestTag))
|
||||||
.detail("TagCost", busiestTagCount)
|
.detail("TagCost", busiestTagCount)
|
||||||
.detail("TotalSampledCost", intervalTotalSampledCount)
|
.detail("TotalSampledCost", intervalTotalSampledCount)
|
||||||
.detail("Reported", previousBusiestTag.present())
|
.detail("Reported", previousBusiestTag.present())
|
||||||
.trackLatest(id.toString() + "/BusiestReadTag");
|
.trackLatest(busiestReadTagEventHolder->trackingKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
intervalCounts.clear();
|
intervalCounts.clear();
|
||||||
|
@ -813,7 +819,7 @@ public:
|
||||||
}
|
}
|
||||||
} counters;
|
} counters;
|
||||||
|
|
||||||
Reference<EventCacheHolder> storageServerEventHolder;
|
Reference<EventCacheHolder> storageServerSourceTLogIDEventHolder;
|
||||||
|
|
||||||
StorageServer(IKeyValueStore* storage,
|
StorageServer(IKeyValueStore* storage,
|
||||||
Reference<AsyncVar<ServerDBInfo> const> const& db,
|
Reference<AsyncVar<ServerDBInfo> const> const& db,
|
||||||
|
@ -853,9 +859,9 @@ public:
|
||||||
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
|
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
|
||||||
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
|
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
|
||||||
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), maxQueryQueue(0), counters(this),
|
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), maxQueryQueue(0), counters(this),
|
||||||
storageServerEventHolder(makeReference<EventCacheHolder>(ssi.id().toString() + "/BusiestReadTag")) {
|
storageServerSourceTLogIDEventHolder(
|
||||||
transactionTagCounter.transactionTagCounterHolder =
|
makeReference<EventCacheHolder>(ssi.id().toString() + "/StorageServerSourceTLogID")),
|
||||||
makeReference<EventCacheHolder>(ssi.id().toString() + +"/StorageServerSourceTLogID");
|
transactionTagCounter(ssi.id()) {
|
||||||
version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
|
version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
|
||||||
oldestVersion.initMetric(LiteralStringRef("StorageServer.OldestVersion"), counters.cc.id);
|
oldestVersion.initMetric(LiteralStringRef("StorageServer.OldestVersion"), counters.cc.id);
|
||||||
durableVersion.initMetric(LiteralStringRef("StorageServer.DurableVersion"), counters.cc.id);
|
durableVersion.initMetric(LiteralStringRef("StorageServer.DurableVersion"), counters.cc.id);
|
||||||
|
@ -3952,7 +3958,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
||||||
TraceEvent("StorageServerSourceTLogID", data->thisServerID)
|
TraceEvent("StorageServerSourceTLogID", data->thisServerID)
|
||||||
.detail("SourceTLogID",
|
.detail("SourceTLogID",
|
||||||
data->sourceTLogID.present() ? data->sourceTLogID.get().toString() : "unknown")
|
data->sourceTLogID.present() ? data->sourceTLogID.get().toString() : "unknown")
|
||||||
.trackLatest(data->thisServerID.toString() + "/StorageServerSourceTLogID");
|
.trackLatest(data->storageServerSourceTLogIDEventHolder->trackingKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
data->noRecentUpdates.set(false);
|
data->noRecentUpdates.set(false);
|
||||||
|
@ -5023,9 +5029,9 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
|
||||||
self->actors.add(traceRole(Role::STORAGE_SERVER, ssi.id()));
|
self->actors.add(traceRole(Role::STORAGE_SERVER, ssi.id()));
|
||||||
self->actors.add(reportStorageServerState(self));
|
self->actors.add(reportStorageServerState(self));
|
||||||
|
|
||||||
self->transactionTagCounter.startNewInterval(self->thisServerID);
|
self->transactionTagCounter.startNewInterval();
|
||||||
self->actors.add(recurring([&]() { self->transactionTagCounter.startNewInterval(self->thisServerID); },
|
self->actors.add(
|
||||||
SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL));
|
recurring([&]() { self->transactionTagCounter.startNewInterval(); }, SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL));
|
||||||
|
|
||||||
self->coreStarted.send(Void());
|
self->coreStarted.send(Void());
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue