From 7f2381484147057ef2cf8618c7fd944183610150 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Wed, 26 Jun 2019 14:03:02 -0700 Subject: [PATCH] Track run loop busyness and report it in status. --- .../source/mr-status-json-schemas.rst.inc | 3 +- documentation/sphinx/source/release-notes.rst | 2 + fdbclient/Schemas.cpp | 3 +- fdbserver/Status.actor.cpp | 97 +++++++++++-------- flow/Net2.actor.cpp | 34 ++++--- flow/SystemMonitor.cpp | 29 ++++-- flow/network.h | 3 + 7 files changed, 109 insertions(+), 62 deletions(-) diff --git a/documentation/sphinx/source/mr-status-json-schemas.rst.inc b/documentation/sphinx/source/mr-status-json-schemas.rst.inc index 5b0099f142..ad5d6d95b5 100644 --- a/documentation/sphinx/source/mr-status-json-schemas.rst.inc +++ b/documentation/sphinx/source/mr-status-json-schemas.rst.inc @@ -187,7 +187,8 @@ "megabits_received":{ "hz":0.0 } - } + }, + "run_loop_busy":0.2 // fraction of time the run loop was busy } }, "old_logs":[ diff --git a/documentation/sphinx/source/release-notes.rst b/documentation/sphinx/source/release-notes.rst index f2a9813030..05654f5d14 100644 --- a/documentation/sphinx/source/release-notes.rst +++ b/documentation/sphinx/source/release-notes.rst @@ -17,6 +17,8 @@ Fixes Status ------ +* Added ``run_loop_busy`` to the ``processes`` section to record the fraction of time the run loop is busy. `(PR #) `_. + Bindings -------- diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index 2e3db10c40..f1f2c5e305 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -207,7 +207,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "megabits_received":{ "hz":0.0 } - } + }, + "run_loop_busy":0.2 } }, "old_logs":[ diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index 47c61aeb9f..2a8e8cf27c 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -315,10 +315,10 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, vector 0){ - cpuObj["logical_core_utilization"] = std::max(0.0, std::min(cpu_seconds / elapsed, 1.0)); + cpuObj["logical_core_utilization"] = std::max(0.0, std::min(cpuSeconds / elapsed, 1.0)); } statusObj["cpu"] = cpuObj; @@ -541,8 +541,8 @@ struct RolesInfo { ACTOR static Future processStatusFetcher( Reference> db, std::vector workers, WorkerEvents pMetrics, - WorkerEvents mMetrics, WorkerEvents errors, WorkerEvents traceFileOpenErrors, WorkerEvents programStarts, - std::map> processIssues, + WorkerEvents mMetrics, WorkerEvents nMetrics, WorkerEvents errors, WorkerEvents traceFileOpenErrors, + WorkerEvents programStarts, std::map> processIssues, vector> storageServers, vector> tLogs, vector> proxies, Database cx, Optional configuration, Optional healthyZone, std::set* incomplete_reasons) { @@ -668,84 +668,84 @@ ACTOR static Future processStatusFetcher( ASSERT(pMetrics.count(workerItr->interf.address())); NetworkAddress address = workerItr->interf.address(); - const TraceEventFields& event = pMetrics[workerItr->interf.address()]; + const TraceEventFields& processMetrics = pMetrics[workerItr->interf.address()]; statusObj["address"] = address.toString(); JsonBuilderObject memoryObj; - if (event.size() > 0) { - std::string zoneID = event.getValue("ZoneID"); + if (processMetrics.size() > 0) { + std::string zoneID = processMetrics.getValue("ZoneID"); statusObj["fault_domain"] = zoneID; if(healthyZone.present() && healthyZone == workerItr->interf.locality.zoneId()) { statusObj["under_maintenance"] = true; } - std::string MachineID = event.getValue("MachineID"); + std::string MachineID = processMetrics.getValue("MachineID"); statusObj["machine_id"] = MachineID; statusObj["locality"] = getLocalityInfo(workerItr->interf.locality); - statusObj.setKeyRawNumber("uptime_seconds",event.getValue("UptimeSeconds")); + statusObj.setKeyRawNumber("uptime_seconds", processMetrics.getValue("UptimeSeconds")); // rates are calculated over the last elapsed seconds - double elapsed = event.getDouble("Elapsed"); - double cpu_seconds = event.getDouble("CPUSeconds"); - double diskIdleSeconds = event.getDouble("DiskIdleSeconds"); - double diskReads = event.getDouble("DiskReads"); - double diskWrites = event.getDouble("DiskWrites"); + double processMetricsElapsed = processMetrics.getDouble("Elapsed"); + double cpuSeconds = processMetrics.getDouble("CPUSeconds"); + double diskIdleSeconds = processMetrics.getDouble("DiskIdleSeconds"); + double diskReads = processMetrics.getDouble("DiskReads"); + double diskWrites = processMetrics.getDouble("DiskWrites"); JsonBuilderObject diskObj; - if (elapsed > 0){ + if (processMetricsElapsed > 0) { JsonBuilderObject cpuObj; - cpuObj["usage_cores"] = std::max(0.0, cpu_seconds / elapsed); + cpuObj["usage_cores"] = std::max(0.0, cpuSeconds / processMetricsElapsed); statusObj["cpu"] = cpuObj; - diskObj["busy"] = std::max(0.0, std::min((elapsed - diskIdleSeconds) / elapsed, 1.0)); + diskObj["busy"] = std::max(0.0, std::min((processMetricsElapsed - diskIdleSeconds) / processMetricsElapsed, 1.0)); JsonBuilderObject readsObj; - readsObj.setKeyRawNumber("counter",event.getValue("DiskReadsCount")); - if (elapsed > 0) - readsObj["hz"] = diskReads / elapsed; - readsObj.setKeyRawNumber("sectors",event.getValue("DiskReadSectors")); + readsObj.setKeyRawNumber("counter", processMetrics.getValue("DiskReadsCount")); + if (processMetricsElapsed > 0) + readsObj["hz"] = diskReads / processMetricsElapsed; + readsObj.setKeyRawNumber("sectors", processMetrics.getValue("DiskReadSectors")); JsonBuilderObject writesObj; - writesObj.setKeyRawNumber("counter",event.getValue("DiskWritesCount")); - if (elapsed > 0) - writesObj["hz"] = diskWrites / elapsed; - writesObj.setKeyRawNumber("sectors",event.getValue("DiskWriteSectors")); + writesObj.setKeyRawNumber("counter", processMetrics.getValue("DiskWritesCount")); + if (processMetricsElapsed > 0) + writesObj["hz"] = diskWrites / processMetricsElapsed; + writesObj.setKeyRawNumber("sectors", processMetrics.getValue("DiskWriteSectors")); diskObj["reads"] = readsObj; diskObj["writes"] = writesObj; } - diskObj.setKeyRawNumber("total_bytes",event.getValue("DiskTotalBytes")); - diskObj.setKeyRawNumber("free_bytes",event.getValue("DiskFreeBytes")); + diskObj.setKeyRawNumber("total_bytes", processMetrics.getValue("DiskTotalBytes")); + diskObj.setKeyRawNumber("free_bytes", processMetrics.getValue("DiskFreeBytes")); statusObj["disk"] = diskObj; JsonBuilderObject networkObj; - networkObj.setKeyRawNumber("current_connections",event.getValue("CurrentConnections")); + networkObj.setKeyRawNumber("current_connections", processMetrics.getValue("CurrentConnections")); JsonBuilderObject connections_established; - connections_established.setKeyRawNumber("hz",event.getValue("ConnectionsEstablished")); + connections_established.setKeyRawNumber("hz", processMetrics.getValue("ConnectionsEstablished")); networkObj["connections_established"] = connections_established; JsonBuilderObject connections_closed; - connections_closed.setKeyRawNumber("hz",event.getValue("ConnectionsClosed")); + connections_closed.setKeyRawNumber("hz", processMetrics.getValue("ConnectionsClosed")); networkObj["connections_closed"] = connections_closed; JsonBuilderObject connection_errors; - connection_errors.setKeyRawNumber("hz",event.getValue("ConnectionErrors")); + connection_errors.setKeyRawNumber("hz", processMetrics.getValue("ConnectionErrors")); networkObj["connection_errors"] = connection_errors; JsonBuilderObject megabits_sent; - megabits_sent.setKeyRawNumber("hz",event.getValue("MbpsSent")); + megabits_sent.setKeyRawNumber("hz", processMetrics.getValue("MbpsSent")); networkObj["megabits_sent"] = megabits_sent; JsonBuilderObject megabits_received; - megabits_received.setKeyRawNumber("hz",event.getValue("MbpsReceived")); + megabits_received.setKeyRawNumber("hz", processMetrics.getValue("MbpsReceived")); networkObj["megabits_received"] = megabits_received; statusObj["network"] = networkObj; - memoryObj.setKeyRawNumber("used_bytes",event.getValue("Memory")); - memoryObj.setKeyRawNumber("unused_allocated_memory",event.getValue("UnusedAllocatedMemory")); + memoryObj.setKeyRawNumber("used_bytes", processMetrics.getValue("Memory")); + memoryObj.setKeyRawNumber("unused_allocated_memory", processMetrics.getValue("UnusedAllocatedMemory")); } if (programStarts.count(address)) { @@ -820,6 +820,19 @@ ACTOR static Future processStatusFetcher( if(workerItr->degraded) { statusObj["degraded"] = true; } + + const TraceEventFields& networkMetrics = nMetrics[workerItr->interf.address()]; + double networkMetricsElapsed = networkMetrics.getDouble("Elapsed"); + + try { + double runLoopBusy = networkMetrics.getDouble("PriorityBusy1"); + statusObj["run_loop_busy"] = runLoopBusy / networkMetricsElapsed; + } + catch(Error &e) { + // This should only happen very early in the process lifetime before priority bin info has been populated + incomplete_reasons->insert("Cannot retrieve run loop busyness."); + } + } catch (Error& e){ // Something strange occurred, process list is incomplete but what was built so far, if anything, will be returned. @@ -1905,6 +1918,7 @@ ACTOR Future clusterGetStatus( std::vector< Future< Optional >> > > futures; futures.push_back(latestEventOnWorkers(workers, "MachineMetrics")); futures.push_back(latestEventOnWorkers(workers, "ProcessMetrics")); + futures.push_back(latestEventOnWorkers(workers, "NetworkMetrics")); futures.push_back(latestErrorOnWorkers(workers)); futures.push_back(latestEventOnWorkers(workers, "TraceFileOpenError")); futures.push_back(latestEventOnWorkers(workers, "ProgramStart")); @@ -1944,9 +1958,10 @@ ACTOR Future clusterGetStatus( state WorkerEvents mMetrics = workerEventsVec[0].present() ? workerEventsVec[0].get().first : WorkerEvents(); // process metrics state WorkerEvents pMetrics = workerEventsVec[1].present() ? workerEventsVec[1].get().first : WorkerEvents(); - state WorkerEvents latestError = workerEventsVec[2].present() ? workerEventsVec[2].get().first : WorkerEvents(); - state WorkerEvents traceFileOpenErrors = workerEventsVec[3].present() ? workerEventsVec[3].get().first : WorkerEvents(); - state WorkerEvents programStarts = workerEventsVec[4].present() ? workerEventsVec[4].get().first : WorkerEvents(); + state WorkerEvents networkMetrics = workerEventsVec[2].present() ? workerEventsVec[2].get().first : WorkerEvents(); + state WorkerEvents latestError = workerEventsVec[3].present() ? workerEventsVec[3].get().first : WorkerEvents(); + state WorkerEvents traceFileOpenErrors = workerEventsVec[4].present() ? workerEventsVec[4].get().first : WorkerEvents(); + state WorkerEvents programStarts = workerEventsVec[5].present() ? workerEventsVec[5].get().first : WorkerEvents(); state JsonBuilderObject statusObj; if(db->get().recoveryCount > 0) { @@ -2089,7 +2104,11 @@ ACTOR Future clusterGetStatus( statusObj["layers"] = layers; } - JsonBuilderObject processStatus = wait(processStatusFetcher(db, workers, pMetrics, mMetrics, latestError, traceFileOpenErrors, programStarts, processIssues, storageServers, tLogs, proxies, cx, configuration, loadResult.present() ? loadResult.get().healthyZone : Optional(), &status_incomplete_reasons)); + JsonBuilderObject processStatus = wait(processStatusFetcher(db, workers, pMetrics, mMetrics, networkMetrics, + latestError, traceFileOpenErrors, programStarts, + processIssues, storageServers, tLogs, proxies, cx, + configuration, loadResult.present() ? loadResult.get().healthyZone : Optional(), + &status_incomplete_reasons)); statusObj["processes"] = processStatus; statusObj["clients"] = clientStatusFetcher(clientVersionMap, clientStatusInfoMap); diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 2dcf9783ed..ef6426936d 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -168,7 +168,6 @@ public: double lastPriorityTrackTime; int lastMinTaskID; - double priorityTimer[NetworkMetrics::PRIORITY_BINS]; std::priority_queue> ready; ThreadSafeQueue threadReady; @@ -577,7 +576,8 @@ void Net2::run() { if (runFunc) { tsc_begin = __rdtsc(); - taskBegin = timer_monotonic(); + taskBegin = nnow; + trackMinPriority(TaskRunCycleFunction, taskBegin); runFunc(); checkForSlowTask(tsc_begin, __rdtsc(), timer_monotonic() - taskBegin, TaskRunCycleFunction); } @@ -591,8 +591,11 @@ void Net2::run() { ++countWontSleep; if (b) { sleepTime = 1e99; - if (!timers.empty()) - sleepTime = timers.top().at - timer_monotonic(); // + 500e-6? + double sleepStart = timer_monotonic(); + if (!timers.empty()) { + sleepTime = timers.top().at - sleepStart; // + 500e-6? + } + trackMinPriority(0, sleepStart); } awakeMetric = false; @@ -607,7 +610,6 @@ void Net2::run() { if ((now-nnow) > FLOW_KNOBS->SLOW_LOOP_CUTOFF && nondeterministicRandom()->random01() < (now-nnow)*FLOW_KNOBS->SLOW_LOOP_SAMPLING_RATE) TraceEvent("SomewhatSlowRunLoopTop").detail("Elapsed", now - nnow); - if (sleepTime) trackMinPriority( 0, now ); while (!timers.empty() && timers.top().at < now) { ++countTimers; ready.push( timers.top() ); @@ -641,7 +643,7 @@ void Net2::run() { if (check_yield(TaskMaxPriority, true)) { ++countYields; break; } } - nnow = timer_monotonic(); + trackMinPriority(minTaskID, now); #if defined(__linux__) if(FLOW_KNOBS->SLOWTASK_PROFILING_INTERVAL > 0) { @@ -685,11 +687,10 @@ void Net2::run() { net2liveness.fetch_add(1); } #endif + nnow = timer_monotonic(); if ((nnow-now) > FLOW_KNOBS->SLOW_LOOP_CUTOFF && nondeterministicRandom()->random01() < (nnow-now)*FLOW_KNOBS->SLOW_LOOP_SAMPLING_RATE) TraceEvent("SomewhatSlowRunLoopBottom").detail("Elapsed", nnow - now); // This includes the time spent running tasks - - trackMinPriority( minTaskID, nnow ); } #ifdef WIN32 @@ -698,17 +699,22 @@ void Net2::run() { } void Net2::trackMinPriority( int minTaskID, double now ) { - if (minTaskID != lastMinTaskID) + if (minTaskID != lastMinTaskID) { for(int c=0; c= minTaskID && pri < lastMinTaskID) { // busy -> idle - double busyFor = lastPriorityTrackTime - priorityTimer[c]; - networkMetrics.secSquaredPriorityBlocked[c] += busyFor*busyFor; + if (pri > minTaskID && pri <= lastMinTaskID) { // busy -> idle + double busyFor = lastPriorityTrackTime - networkMetrics.priorityTimer[c]; + networkMetrics.priorityBlocked[c] = false; + networkMetrics.priorityBlockedDuration[c] += busyFor; + networkMetrics.secSquaredPriorityBlocked[c] += busyFor * busyFor; } - if (pri < minTaskID && pri >= lastMinTaskID) { // idle -> busy - priorityTimer[c] = now; + if (pri <= minTaskID && pri > lastMinTaskID) { // idle -> busy + networkMetrics.priorityBlocked[c] = true; + networkMetrics.priorityTimer[c] = now; } } + } + lastMinTaskID = minTaskID; lastPriorityTrackTime = now; } diff --git a/flow/SystemMonitor.cpp b/flow/SystemMonitor.cpp index fc778717e9..4481cba0f0 100644 --- a/flow/SystemMonitor.cpp +++ b/flow/SystemMonitor.cpp @@ -59,8 +59,7 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta netData.init(); if (!DEBUG_DETERMINISM && currentStats.initialized) { { - TraceEvent e(eventName.c_str()); - e + TraceEvent(eventName.c_str()) .detail("Elapsed", currentStats.elapsed) .detail("CPUSeconds", currentStats.processCPUSeconds) .detail("MainThreadCPUSeconds", currentStats.mainThreadCPUSeconds) @@ -120,6 +119,7 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta TraceEvent n("NetworkMetrics"); n + .detail("Elapsed", currentStats.elapsed) .detail("CantSleep", netData.countCantSleep - statState->networkState.countCantSleep) .detail("WontSleep", netData.countWontSleep - statState->networkState.countWontSleep) .detail("Yields", netData.countYields - statState->networkState.countYields) @@ -139,12 +139,27 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta .detail("PacketsGenerated", netData.countPacketsGenerated - statState->networkState.countPacketsGenerated) .detail("WouldBlock", netData.countWouldBlock - statState->networkState.countWouldBlock); - for (int i = 0; inetworkMetrics.countSlowEvents[i] - statState->networkMetricsState.countSlowEvents[i]) + for (int i = 0; inetworkMetrics.countSlowEvents[i] - statState->networkMetricsState.countSlowEvents[i]) { n.detail(format("SlowTask%dM", 1 << i).c_str(), c); - for (int i = 0; inetworkMetrics.secSquaredPriorityBlocked[i] - statState->networkMetricsState.secSquaredPriorityBlocked[i]) - n.detail(format("S2Pri%d", g_network->networkMetrics.priorityBins[i]).c_str(), x); + } + } + + for (int i = 0; i < NetworkMetrics::PRIORITY_BINS && g_network->networkMetrics.priorityBins[i] != 0; i++) { + if(g_network->networkMetrics.priorityBlocked[i]) { + double lastSegment = std::min(currentStats.elapsed, now() - g_network->networkMetrics.priorityTimer[i]); + g_network->networkMetrics.priorityBlockedDuration[i] += lastSegment; + g_network->networkMetrics.secSquaredPriorityBlocked[i] += lastSegment * lastSegment; + g_network->networkMetrics.priorityTimer[i] = now(); + } + + double blocked = g_network->networkMetrics.priorityBlockedDuration[i] - statState->networkMetricsState.priorityBlockedDuration[i]; + double s2Blocked = g_network->networkMetrics.secSquaredPriorityBlocked[i] - statState->networkMetricsState.secSquaredPriorityBlocked[i]; + n.detail(format("PriorityBusy%d", g_network->networkMetrics.priorityBins[i]).c_str(), blocked); + n.detail(format("SumOfSquaredPriorityBusy%d", g_network->networkMetrics.priorityBins[i]).c_str(), s2Blocked); + } + + n.trackLatest("NetworkMetrics"); } if(machineMetrics) { diff --git a/flow/network.h b/flow/network.h index 256bc89b40..55284fb39f 100644 --- a/flow/network.h +++ b/flow/network.h @@ -271,7 +271,10 @@ struct NetworkMetrics { enum { PRIORITY_BINS = 9 }; int priorityBins[ PRIORITY_BINS ]; + bool priorityBlocked[PRIORITY_BINS]; + double priorityBlockedDuration[PRIORITY_BINS]; double secSquaredPriorityBlocked[PRIORITY_BINS]; + double priorityTimer[PRIORITY_BINS]; double oldestAlternativesFailure; double newestAlternativesFailure;