Track run loop busyness and report it in status.

This commit is contained in:
A.J. Beamon 2019-06-26 14:03:02 -07:00
parent cc69bc14ca
commit 7f23814841
7 changed files with 109 additions and 62 deletions

View File

@ -187,7 +187,8 @@
"megabits_received":{
"hz":0.0
}
}
},
"run_loop_busy":0.2 // fraction of time the run loop was busy
}
},
"old_logs":[

View File

@ -17,6 +17,8 @@ Fixes
Status
------
* Added ``run_loop_busy`` to the ``processes`` section to record the fraction of time the run loop is busy. `(PR #) <https://github.com/apple/foundationdb/pull/>`_.
Bindings
--------

View File

@ -207,7 +207,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"megabits_received":{
"hz":0.0
}
}
},
"run_loop_busy":0.2
}
},
"old_logs":[

View File

@ -315,10 +315,10 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, vector<Work
statusObj["memory"] = memoryObj;
JsonBuilderObject cpuObj;
double cpu_seconds = event.getDouble("CPUSeconds");
double cpuSeconds = event.getDouble("CPUSeconds");
double elapsed = event.getDouble("Elapsed");
if (elapsed > 0){
cpuObj["logical_core_utilization"] = std::max(0.0, std::min(cpu_seconds / elapsed, 1.0));
cpuObj["logical_core_utilization"] = std::max(0.0, std::min(cpuSeconds / elapsed, 1.0));
}
statusObj["cpu"] = cpuObj;
@ -541,8 +541,8 @@ struct RolesInfo {
ACTOR static Future<JsonBuilderObject> processStatusFetcher(
Reference<AsyncVar<struct ServerDBInfo>> db, std::vector<WorkerDetails> workers, WorkerEvents pMetrics,
WorkerEvents mMetrics, WorkerEvents errors, WorkerEvents traceFileOpenErrors, WorkerEvents programStarts,
std::map<std::string, std::vector<JsonBuilderObject>> processIssues,
WorkerEvents mMetrics, WorkerEvents nMetrics, WorkerEvents errors, WorkerEvents traceFileOpenErrors,
WorkerEvents programStarts, std::map<std::string, std::vector<JsonBuilderObject>> processIssues,
vector<std::pair<StorageServerInterface, EventMap>> storageServers,
vector<std::pair<TLogInterface, EventMap>> tLogs, vector<std::pair<MasterProxyInterface, EventMap>> proxies,
Database cx, Optional<DatabaseConfiguration> configuration, Optional<Key> healthyZone, std::set<std::string>* incomplete_reasons) {
@ -668,84 +668,84 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
ASSERT(pMetrics.count(workerItr->interf.address()));
NetworkAddress address = workerItr->interf.address();
const TraceEventFields& event = pMetrics[workerItr->interf.address()];
const TraceEventFields& processMetrics = pMetrics[workerItr->interf.address()];
statusObj["address"] = address.toString();
JsonBuilderObject memoryObj;
if (event.size() > 0) {
std::string zoneID = event.getValue("ZoneID");
if (processMetrics.size() > 0) {
std::string zoneID = processMetrics.getValue("ZoneID");
statusObj["fault_domain"] = zoneID;
if(healthyZone.present() && healthyZone == workerItr->interf.locality.zoneId()) {
statusObj["under_maintenance"] = true;
}
std::string MachineID = event.getValue("MachineID");
std::string MachineID = processMetrics.getValue("MachineID");
statusObj["machine_id"] = MachineID;
statusObj["locality"] = getLocalityInfo(workerItr->interf.locality);
statusObj.setKeyRawNumber("uptime_seconds",event.getValue("UptimeSeconds"));
statusObj.setKeyRawNumber("uptime_seconds", processMetrics.getValue("UptimeSeconds"));
// rates are calculated over the last elapsed seconds
double elapsed = event.getDouble("Elapsed");
double cpu_seconds = event.getDouble("CPUSeconds");
double diskIdleSeconds = event.getDouble("DiskIdleSeconds");
double diskReads = event.getDouble("DiskReads");
double diskWrites = event.getDouble("DiskWrites");
double processMetricsElapsed = processMetrics.getDouble("Elapsed");
double cpuSeconds = processMetrics.getDouble("CPUSeconds");
double diskIdleSeconds = processMetrics.getDouble("DiskIdleSeconds");
double diskReads = processMetrics.getDouble("DiskReads");
double diskWrites = processMetrics.getDouble("DiskWrites");
JsonBuilderObject diskObj;
if (elapsed > 0){
if (processMetricsElapsed > 0) {
JsonBuilderObject cpuObj;
cpuObj["usage_cores"] = std::max(0.0, cpu_seconds / elapsed);
cpuObj["usage_cores"] = std::max(0.0, cpuSeconds / processMetricsElapsed);
statusObj["cpu"] = cpuObj;
diskObj["busy"] = std::max(0.0, std::min((elapsed - diskIdleSeconds) / elapsed, 1.0));
diskObj["busy"] = std::max(0.0, std::min((processMetricsElapsed - diskIdleSeconds) / processMetricsElapsed, 1.0));
JsonBuilderObject readsObj;
readsObj.setKeyRawNumber("counter",event.getValue("DiskReadsCount"));
if (elapsed > 0)
readsObj["hz"] = diskReads / elapsed;
readsObj.setKeyRawNumber("sectors",event.getValue("DiskReadSectors"));
readsObj.setKeyRawNumber("counter", processMetrics.getValue("DiskReadsCount"));
if (processMetricsElapsed > 0)
readsObj["hz"] = diskReads / processMetricsElapsed;
readsObj.setKeyRawNumber("sectors", processMetrics.getValue("DiskReadSectors"));
JsonBuilderObject writesObj;
writesObj.setKeyRawNumber("counter",event.getValue("DiskWritesCount"));
if (elapsed > 0)
writesObj["hz"] = diskWrites / elapsed;
writesObj.setKeyRawNumber("sectors",event.getValue("DiskWriteSectors"));
writesObj.setKeyRawNumber("counter", processMetrics.getValue("DiskWritesCount"));
if (processMetricsElapsed > 0)
writesObj["hz"] = diskWrites / processMetricsElapsed;
writesObj.setKeyRawNumber("sectors", processMetrics.getValue("DiskWriteSectors"));
diskObj["reads"] = readsObj;
diskObj["writes"] = writesObj;
}
diskObj.setKeyRawNumber("total_bytes",event.getValue("DiskTotalBytes"));
diskObj.setKeyRawNumber("free_bytes",event.getValue("DiskFreeBytes"));
diskObj.setKeyRawNumber("total_bytes", processMetrics.getValue("DiskTotalBytes"));
diskObj.setKeyRawNumber("free_bytes", processMetrics.getValue("DiskFreeBytes"));
statusObj["disk"] = diskObj;
JsonBuilderObject networkObj;
networkObj.setKeyRawNumber("current_connections",event.getValue("CurrentConnections"));
networkObj.setKeyRawNumber("current_connections", processMetrics.getValue("CurrentConnections"));
JsonBuilderObject connections_established;
connections_established.setKeyRawNumber("hz",event.getValue("ConnectionsEstablished"));
connections_established.setKeyRawNumber("hz", processMetrics.getValue("ConnectionsEstablished"));
networkObj["connections_established"] = connections_established;
JsonBuilderObject connections_closed;
connections_closed.setKeyRawNumber("hz",event.getValue("ConnectionsClosed"));
connections_closed.setKeyRawNumber("hz", processMetrics.getValue("ConnectionsClosed"));
networkObj["connections_closed"] = connections_closed;
JsonBuilderObject connection_errors;
connection_errors.setKeyRawNumber("hz",event.getValue("ConnectionErrors"));
connection_errors.setKeyRawNumber("hz", processMetrics.getValue("ConnectionErrors"));
networkObj["connection_errors"] = connection_errors;
JsonBuilderObject megabits_sent;
megabits_sent.setKeyRawNumber("hz",event.getValue("MbpsSent"));
megabits_sent.setKeyRawNumber("hz", processMetrics.getValue("MbpsSent"));
networkObj["megabits_sent"] = megabits_sent;
JsonBuilderObject megabits_received;
megabits_received.setKeyRawNumber("hz",event.getValue("MbpsReceived"));
megabits_received.setKeyRawNumber("hz", processMetrics.getValue("MbpsReceived"));
networkObj["megabits_received"] = megabits_received;
statusObj["network"] = networkObj;
memoryObj.setKeyRawNumber("used_bytes",event.getValue("Memory"));
memoryObj.setKeyRawNumber("unused_allocated_memory",event.getValue("UnusedAllocatedMemory"));
memoryObj.setKeyRawNumber("used_bytes", processMetrics.getValue("Memory"));
memoryObj.setKeyRawNumber("unused_allocated_memory", processMetrics.getValue("UnusedAllocatedMemory"));
}
if (programStarts.count(address)) {
@ -820,6 +820,19 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
if(workerItr->degraded) {
statusObj["degraded"] = true;
}
const TraceEventFields& networkMetrics = nMetrics[workerItr->interf.address()];
double networkMetricsElapsed = networkMetrics.getDouble("Elapsed");
try {
double runLoopBusy = networkMetrics.getDouble("PriorityBusy1");
statusObj["run_loop_busy"] = runLoopBusy / networkMetricsElapsed;
}
catch(Error &e) {
// This should only happen very early in the process lifetime before priority bin info has been populated
incomplete_reasons->insert("Cannot retrieve run loop busyness.");
}
}
catch (Error& e){
// Something strange occurred, process list is incomplete but what was built so far, if anything, will be returned.
@ -1905,6 +1918,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
std::vector< Future< Optional <std::pair<WorkerEvents, std::set<std::string>>> > > futures;
futures.push_back(latestEventOnWorkers(workers, "MachineMetrics"));
futures.push_back(latestEventOnWorkers(workers, "ProcessMetrics"));
futures.push_back(latestEventOnWorkers(workers, "NetworkMetrics"));
futures.push_back(latestErrorOnWorkers(workers));
futures.push_back(latestEventOnWorkers(workers, "TraceFileOpenError"));
futures.push_back(latestEventOnWorkers(workers, "ProgramStart"));
@ -1944,9 +1958,10 @@ ACTOR Future<StatusReply> clusterGetStatus(
state WorkerEvents mMetrics = workerEventsVec[0].present() ? workerEventsVec[0].get().first : WorkerEvents();
// process metrics
state WorkerEvents pMetrics = workerEventsVec[1].present() ? workerEventsVec[1].get().first : WorkerEvents();
state WorkerEvents latestError = workerEventsVec[2].present() ? workerEventsVec[2].get().first : WorkerEvents();
state WorkerEvents traceFileOpenErrors = workerEventsVec[3].present() ? workerEventsVec[3].get().first : WorkerEvents();
state WorkerEvents programStarts = workerEventsVec[4].present() ? workerEventsVec[4].get().first : WorkerEvents();
state WorkerEvents networkMetrics = workerEventsVec[2].present() ? workerEventsVec[2].get().first : WorkerEvents();
state WorkerEvents latestError = workerEventsVec[3].present() ? workerEventsVec[3].get().first : WorkerEvents();
state WorkerEvents traceFileOpenErrors = workerEventsVec[4].present() ? workerEventsVec[4].get().first : WorkerEvents();
state WorkerEvents programStarts = workerEventsVec[5].present() ? workerEventsVec[5].get().first : WorkerEvents();
state JsonBuilderObject statusObj;
if(db->get().recoveryCount > 0) {
@ -2089,7 +2104,11 @@ ACTOR Future<StatusReply> clusterGetStatus(
statusObj["layers"] = layers;
}
JsonBuilderObject processStatus = wait(processStatusFetcher(db, workers, pMetrics, mMetrics, latestError, traceFileOpenErrors, programStarts, processIssues, storageServers, tLogs, proxies, cx, configuration, loadResult.present() ? loadResult.get().healthyZone : Optional<Key>(), &status_incomplete_reasons));
JsonBuilderObject processStatus = wait(processStatusFetcher(db, workers, pMetrics, mMetrics, networkMetrics,
latestError, traceFileOpenErrors, programStarts,
processIssues, storageServers, tLogs, proxies, cx,
configuration, loadResult.present() ? loadResult.get().healthyZone : Optional<Key>(),
&status_incomplete_reasons));
statusObj["processes"] = processStatus;
statusObj["clients"] = clientStatusFetcher(clientVersionMap, clientStatusInfoMap);

View File

@ -168,7 +168,6 @@ public:
double lastPriorityTrackTime;
int lastMinTaskID;
double priorityTimer[NetworkMetrics::PRIORITY_BINS];
std::priority_queue<OrderedTask, std::vector<OrderedTask>> ready;
ThreadSafeQueue<OrderedTask> threadReady;
@ -577,7 +576,8 @@ void Net2::run() {
if (runFunc) {
tsc_begin = __rdtsc();
taskBegin = timer_monotonic();
taskBegin = nnow;
trackMinPriority(TaskRunCycleFunction, taskBegin);
runFunc();
checkForSlowTask(tsc_begin, __rdtsc(), timer_monotonic() - taskBegin, TaskRunCycleFunction);
}
@ -591,8 +591,11 @@ void Net2::run() {
++countWontSleep;
if (b) {
sleepTime = 1e99;
if (!timers.empty())
sleepTime = timers.top().at - timer_monotonic(); // + 500e-6?
double sleepStart = timer_monotonic();
if (!timers.empty()) {
sleepTime = timers.top().at - sleepStart; // + 500e-6?
}
trackMinPriority(0, sleepStart);
}
awakeMetric = false;
@ -607,7 +610,6 @@ void Net2::run() {
if ((now-nnow) > FLOW_KNOBS->SLOW_LOOP_CUTOFF && nondeterministicRandom()->random01() < (now-nnow)*FLOW_KNOBS->SLOW_LOOP_SAMPLING_RATE)
TraceEvent("SomewhatSlowRunLoopTop").detail("Elapsed", now - nnow);
if (sleepTime) trackMinPriority( 0, now );
while (!timers.empty() && timers.top().at < now) {
++countTimers;
ready.push( timers.top() );
@ -641,7 +643,7 @@ void Net2::run() {
if (check_yield(TaskMaxPriority, true)) { ++countYields; break; }
}
nnow = timer_monotonic();
trackMinPriority(minTaskID, now);
#if defined(__linux__)
if(FLOW_KNOBS->SLOWTASK_PROFILING_INTERVAL > 0) {
@ -685,11 +687,10 @@ void Net2::run() {
net2liveness.fetch_add(1);
}
#endif
nnow = timer_monotonic();
if ((nnow-now) > FLOW_KNOBS->SLOW_LOOP_CUTOFF && nondeterministicRandom()->random01() < (nnow-now)*FLOW_KNOBS->SLOW_LOOP_SAMPLING_RATE)
TraceEvent("SomewhatSlowRunLoopBottom").detail("Elapsed", nnow - now); // This includes the time spent running tasks
trackMinPriority( minTaskID, nnow );
}
#ifdef WIN32
@ -698,17 +699,22 @@ void Net2::run() {
}
void Net2::trackMinPriority( int minTaskID, double now ) {
if (minTaskID != lastMinTaskID)
if (minTaskID != lastMinTaskID) {
for(int c=0; c<NetworkMetrics::PRIORITY_BINS; c++) {
int64_t pri = networkMetrics.priorityBins[c];
if (pri >= minTaskID && pri < lastMinTaskID) { // busy -> idle
double busyFor = lastPriorityTrackTime - priorityTimer[c];
networkMetrics.secSquaredPriorityBlocked[c] += busyFor*busyFor;
if (pri > minTaskID && pri <= lastMinTaskID) { // busy -> idle
double busyFor = lastPriorityTrackTime - networkMetrics.priorityTimer[c];
networkMetrics.priorityBlocked[c] = false;
networkMetrics.priorityBlockedDuration[c] += busyFor;
networkMetrics.secSquaredPriorityBlocked[c] += busyFor * busyFor;
}
if (pri < minTaskID && pri >= lastMinTaskID) { // idle -> busy
priorityTimer[c] = now;
if (pri <= minTaskID && pri > lastMinTaskID) { // idle -> busy
networkMetrics.priorityBlocked[c] = true;
networkMetrics.priorityTimer[c] = now;
}
}
}
lastMinTaskID = minTaskID;
lastPriorityTrackTime = now;
}

View File

@ -59,8 +59,7 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta
netData.init();
if (!DEBUG_DETERMINISM && currentStats.initialized) {
{
TraceEvent e(eventName.c_str());
e
TraceEvent(eventName.c_str())
.detail("Elapsed", currentStats.elapsed)
.detail("CPUSeconds", currentStats.processCPUSeconds)
.detail("MainThreadCPUSeconds", currentStats.mainThreadCPUSeconds)
@ -120,6 +119,7 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta
TraceEvent n("NetworkMetrics");
n
.detail("Elapsed", currentStats.elapsed)
.detail("CantSleep", netData.countCantSleep - statState->networkState.countCantSleep)
.detail("WontSleep", netData.countWontSleep - statState->networkState.countWontSleep)
.detail("Yields", netData.countYields - statState->networkState.countYields)
@ -139,12 +139,27 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta
.detail("PacketsGenerated", netData.countPacketsGenerated - statState->networkState.countPacketsGenerated)
.detail("WouldBlock", netData.countWouldBlock - statState->networkState.countWouldBlock);
for (int i = 0; i<NetworkMetrics::SLOW_EVENT_BINS; i++)
if (int c = g_network->networkMetrics.countSlowEvents[i] - statState->networkMetricsState.countSlowEvents[i])
for (int i = 0; i<NetworkMetrics::SLOW_EVENT_BINS; i++) {
if (int c = g_network->networkMetrics.countSlowEvents[i] - statState->networkMetricsState.countSlowEvents[i]) {
n.detail(format("SlowTask%dM", 1 << i).c_str(), c);
for (int i = 0; i<NetworkMetrics::PRIORITY_BINS; i++)
if (double x = g_network->networkMetrics.secSquaredPriorityBlocked[i] - statState->networkMetricsState.secSquaredPriorityBlocked[i])
n.detail(format("S2Pri%d", g_network->networkMetrics.priorityBins[i]).c_str(), x);
}
}
for (int i = 0; i < NetworkMetrics::PRIORITY_BINS && g_network->networkMetrics.priorityBins[i] != 0; i++) {
if(g_network->networkMetrics.priorityBlocked[i]) {
double lastSegment = std::min(currentStats.elapsed, now() - g_network->networkMetrics.priorityTimer[i]);
g_network->networkMetrics.priorityBlockedDuration[i] += lastSegment;
g_network->networkMetrics.secSquaredPriorityBlocked[i] += lastSegment * lastSegment;
g_network->networkMetrics.priorityTimer[i] = now();
}
double blocked = g_network->networkMetrics.priorityBlockedDuration[i] - statState->networkMetricsState.priorityBlockedDuration[i];
double s2Blocked = g_network->networkMetrics.secSquaredPriorityBlocked[i] - statState->networkMetricsState.secSquaredPriorityBlocked[i];
n.detail(format("PriorityBusy%d", g_network->networkMetrics.priorityBins[i]).c_str(), blocked);
n.detail(format("SumOfSquaredPriorityBusy%d", g_network->networkMetrics.priorityBins[i]).c_str(), s2Blocked);
}
n.trackLatest("NetworkMetrics");
}
if(machineMetrics) {

View File

@ -271,7 +271,10 @@ struct NetworkMetrics {
enum { PRIORITY_BINS = 9 };
int priorityBins[ PRIORITY_BINS ];
bool priorityBlocked[PRIORITY_BINS];
double priorityBlockedDuration[PRIORITY_BINS];
double secSquaredPriorityBlocked[PRIORITY_BINS];
double priorityTimer[PRIORITY_BINS];
double oldestAlternativesFailure;
double newestAlternativesFailure;