Merge pull request #8137 from xis19/main

Reads and reports cpu.stat
This commit is contained in:
Jingyu Zhou 2022-09-21 15:33:57 -07:00 committed by GitHub
commit a8d3282250
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 212 additions and 81 deletions

View File

@ -284,10 +284,20 @@ static JsonBuilderObject getError(const TraceEventFields& errorFields) {
return statusObj;
}
static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics,
std::vector<WorkerDetails> workers,
Optional<DatabaseConfiguration> configuration,
std::set<std::string>* incomplete_reasons) {
namespace {
void reportCgroupCpuStat(JsonBuilderObject& object, const TraceEventFields& eventFields) {
JsonBuilderObject cgroupCpuStatObj;
cgroupCpuStatObj.setKeyRawNumber("nr_periods", eventFields.getValue("NrPeriods"));
cgroupCpuStatObj.setKeyRawNumber("nr_throttled", eventFields.getValue("NrThrottled"));
cgroupCpuStatObj.setKeyRawNumber("throttled_time", eventFields.getValue("ThrottledTime"));
object["cgroup_cpu_stat"] = cgroupCpuStatObj;
}
JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics,
std::vector<WorkerDetails> workers,
Optional<DatabaseConfiguration> configuration,
std::set<std::string>* incomplete_reasons) {
JsonBuilderObject machineMap;
double metric;
int failed = 0;
@ -339,6 +349,10 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics,
memoryObj.setKeyRawNumber("free_bytes", event.getValue("AvailableMemory"));
statusObj["memory"] = memoryObj;
#ifdef __linux__
reportCgroupCpuStat(statusObj, event);
#endif // __linux__
JsonBuilderObject cpuObj;
double cpuSeconds = event.getDouble("CPUSeconds");
double elapsed = event.getDouble("Elapsed");
@ -402,6 +416,8 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics,
return machineMap;
}
} // anonymous namespace
JsonBuilderObject getLagObject(int64_t versions) {
JsonBuilderObject lag;
lag["versions"] = versions;

View File

@ -3,4 +3,4 @@ add_executable(fdbmonitor ${FDBSERVICE_SRCS})
get_target_property(fdbclient_target_includes fdbclient INCLUDE_DIRECTORIES)
target_link_libraries(fdbmonitor PUBLIC SimpleOpt)
target_include_directories(fdbmonitor PUBLIC "${fdbclient_target_includes}")
target_include_directories(fdbmonitor PUBLIC "${fdbclient_target_includes}")

View File

@ -21,22 +21,9 @@
#ifdef _WIN32
// This has to come as the first include on Win32 for rand_s() to be found
#define _CRT_RAND_S
#include <stdlib.h>
#include <math.h> // For _set_FMA3_enable workaround in platformInit
#endif
#endif // _WIN32
#include <errno.h>
#include "fmt/format.h"
#include "flow/Platform.h"
#include "flow/Platform.actor.h"
#include "flow/Arena.h"
#include "flow/StreamCipher.h"
#include "flow/ScopeExit.h"
#include "flow/Trace.h"
#include "flow/Error.h"
#include "flow/Knobs.h"
#include <algorithm>
#include <iostream>
@ -46,29 +33,46 @@
#include <string>
#include <string_view>
#include <vector>
#include <errno.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <boost/format.hpp>
#include <boost/filesystem.hpp>
#include <boost/filesystem/operations.hpp>
#include <boost/algorithm/string.hpp>
#include <sys/types.h>
#include <time.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "flow/UnitTest.h"
#include "fmt/format.h"
#include "flow/Arena.h"
#include "flow/Error.h"
#include "flow/FaultInjection.h"
#include "flow/Knobs.h"
#include "flow/Platform.actor.h"
#include "flow/ScopeExit.h"
#include "flow/StreamCipher.h"
#include "flow/Trace.h"
#include "flow/Trace.h"
#include "flow/UnitTest.h"
#include "flow/Util.h"
#ifdef _WIN32
#include <windows.h>
#include <winioctl.h>
#include <io.h>
#include <psapi.h>
#include <stdio.h>
#include <conio.h>
#include <direct.h>
#include <io.h>
#include <math.h> // For _set_FMA3_enable workaround in platformInit
#include <pdh.h>
#include <pdhmsg.h>
#include <processenv.h>
#include <psapi.h>
#include <stdlib.h>
#include <windows.h>
#include <winioctl.h>
#pragma comment(lib, "pdh.lib")
// for SHGetFolderPath
@ -83,19 +87,18 @@
#define CANONICAL_PATH_SEPARATOR '/'
#include <dirent.h>
#include <sys/time.h>
#include <sys/mman.h>
#include <unistd.h>
#include <ftw.h>
#include <pwd.h>
#include <sched.h>
#include <sys/mman.h>
#include <sys/time.h>
#include <unistd.h>
#include <sys/statvfs.h> /* Needed for disk capacity */
#if !defined(__aarch64__) && !defined(__powerpc64__)
#include <cpuid.h>
#endif
/* Needed for disk capacity */
#include <sys/statvfs.h>
/* getifaddrs */
#include <sys/socket.h>
#include <ifaddrs.h>
@ -116,7 +119,7 @@
#include <signal.h>
/* Needed for gnu_dev_{major,minor} */
#include <sys/sysmacros.h>
#endif
#endif // __linux__
#ifdef __FreeBSD__
/* Needed for processor affinity */
@ -149,32 +152,31 @@
#include <devstat.h>
#include <kvm.h>
#include <libutil.h>
#endif
#endif // __FreeBSD__
#ifdef __APPLE__
/* Needed for cross-platform 'environ' */
#include <crt_externs.h>
#include <sys/uio.h>
#include <sys/syslimits.h>
#include <mach/mach.h>
#include <mach-o/dyld.h>
#include <sys/param.h>
#include <sys/mount.h>
#include <sys/sysctl.h>
#include <netinet/in.h>
#include <net/if.h>
#include <mach/mach.h>
#include <net/if_dl.h>
#include <net/if.h>
#include <net/route.h>
#include <netinet/in.h>
#include <sys/mount.h>
#include <sys/param.h>
#include <sys/sysctl.h>
#include <sys/syslimits.h>
#include <sys/uio.h>
#include <CoreFoundation/CoreFoundation.h>
#include <IOKit/IOKitLib.h>
#include <IOKit/storage/IOBlockStorageDriver.h>
#include <IOKit/storage/IOMedia.h>
#include <IOKit/IOBSD.h>
#endif
#endif // __APPLE_
#endif
#endif // __unixish__
#include "flow/actorcompiler.h" // This must be the last #include.
@ -408,25 +410,24 @@ uint64_t getMemoryUsage() {
#endif
}
#if defined(__linux__)
#ifdef __linux__
namespace linux_os {
namespace {
void getMemoryInfo(std::map<StringRef, int64_t>& request, std::stringstream& memInfoStream) {
size_t count = request.size();
if (count == 0)
return;
while (count > 0 && !memInfoStream.eof()) {
std::string key;
memInfoStream >> key;
keyValueReader<std::string, int64_t>(memInfoStream, [&](const std::string& key, const int64_t& value) -> bool {
auto item = request.find(StringRef(key));
if (item != request.end()) {
int64_t value;
memInfoStream >> value;
if (item != std::end(request)) {
item->second = value;
count--;
--count;
}
memInfoStream.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
}
return count != 0;
});
}
int64_t getLowWatermark(std::stringstream& zoneInfoStream) {
@ -446,10 +447,8 @@ int64_t getLowWatermark(std::stringstream& zoneInfoStream) {
return lowWatermark;
}
#endif
void getMachineRAMInfo(MachineRAMInfo& memInfo) {
#if defined(__linux__)
void getMachineRAMInfoImpl(MachineRAMInfo& memInfo) {
std::ifstream zoneInfoFileStream("/proc/zoneinfo", std::ifstream::in);
int64_t lowWatermark = 0;
if (!zoneInfoFileStream.good()) {
@ -492,6 +491,35 @@ void getMachineRAMInfo(MachineRAMInfo& memInfo) {
}
memInfo.committed = memInfo.total - memInfo.available;
}
} // anonymous namespace
std::map<std::string, int64_t> reportCGroupCpuStat() {
// Default path to the cpu,cpuacct
// See manpages for cgroup
static const std::string PATH_TO_CPU_CPUACCT = "/sys/fs/cgroup/cpu,cpuacct/cpu.stat";
std::map<std::string, int64_t> result;
std::ifstream ifs(PATH_TO_CPU_CPUACCT);
if (!ifs.is_open()) {
return result;
}
keyValueReader<std::string, int64_t>(ifs, [&](const std::string& key, const int64_t& value) -> bool {
result[key] = value;
return true;
});
return result;
}
} // namespace linux_os
#endif // #ifdef __linux__
void getMachineRAMInfo(MachineRAMInfo& memInfo) {
#if defined(__linux__)
linux_os::getMachineRAMInfoImpl(memInfo);
#elif defined(__FreeBSD__)
int status;
@ -4001,14 +4029,14 @@ TEST_CASE("/flow/Platform/getMemoryInfo") {
};
std::stringstream memInfoStream(memString);
getMemoryInfo(request, memInfoStream);
ASSERT(request["MemTotal:"_sr] == 24733228);
ASSERT(request["MemFree:"_sr] == 2077580);
ASSERT(request["MemAvailable:"_sr] == 0);
ASSERT(request["Buffers:"_sr] == 266940);
ASSERT(request["Cached:"_sr] == 16798292);
ASSERT(request["SwapTotal:"_sr] == 25165820);
ASSERT(request["SwapFree:"_sr] == 23680228);
linux_os::getMemoryInfo(request, memInfoStream);
ASSERT(request[LiteralStringRef("MemTotal:")] == 24733228);
ASSERT(request[LiteralStringRef("MemFree:")] == 2077580);
ASSERT(request[LiteralStringRef("MemAvailable:")] == 0);
ASSERT(request[LiteralStringRef("Buffers:")] == 266940);
ASSERT(request[LiteralStringRef("Cached:")] == 16798292);
ASSERT(request[LiteralStringRef("SwapTotal:")] == 25165820);
ASSERT(request[LiteralStringRef("SwapFree:")] == 23680228);
for (auto& item : request) {
fmt::print("{}:{}\n", item.first.toString().c_str(), item.second);
}
@ -4052,14 +4080,14 @@ TEST_CASE("/flow/Platform/getMemoryInfo") {
"AnonHugePages: 1275904 kB\n";
std::stringstream memInfoStream1(memString1);
getMemoryInfo(request, memInfoStream1);
ASSERT(request["MemTotal:"_sr] == 31856496);
ASSERT(request["MemFree:"_sr] == 25492716);
ASSERT(request["MemAvailable:"_sr] == 28470756);
ASSERT(request["Buffers:"_sr] == 313644);
ASSERT(request["Cached:"_sr] == 2956444);
ASSERT(request["SwapTotal:"_sr] == 0);
ASSERT(request["SwapFree:"_sr] == 0);
linux_os::getMemoryInfo(request, memInfoStream1);
ASSERT(request[LiteralStringRef("MemTotal:")] == 31856496);
ASSERT(request[LiteralStringRef("MemFree:")] == 25492716);
ASSERT(request[LiteralStringRef("MemAvailable:")] == 28470756);
ASSERT(request[LiteralStringRef("Buffers:")] == 313644);
ASSERT(request[LiteralStringRef("Cached:")] == 2956444);
ASSERT(request[LiteralStringRef("SwapTotal:")] == 0);
ASSERT(request[LiteralStringRef("SwapFree:")] == 0);
for (auto& item : request) {
fmt::print("{}:{}\n", item.first.toString().c_str(), item.second);
}

View File

@ -18,6 +18,8 @@
* limitations under the License.
*/
#include <fstream>
#include "flow/flow.h"
#include "flow/Histogram.h"
#include "flow/Platform.h"
@ -67,6 +69,35 @@ SystemStatistics getSystemStatistics() {
.detail("ApproximateUnusedMemory" #size, FastAllocator<size>::getApproximateMemoryUnused()) \
.detail("ActiveThreads" #size, FastAllocator<size>::getActiveThreads())
namespace {
#ifdef __linux__
// Converts cgroup key, e.g. nr_periods, to NrPeriods
std::string capitalizeCgroupKey(const std::string& key) {
bool wordStart = true;
std::string result;
result.reserve(key.size());
for (const char ch : key) {
if (std::isalnum(ch)) {
if (wordStart) {
result.push_back(std::toupper(ch));
wordStart = false;
} else {
result.push_back(ch);
}
} else {
// Skip non-alnum characters
wordStart = true;
}
}
return result;
}
#endif // __linux__
} // anonymous namespace
SystemStatistics customSystemMonitor(std::string const& eventName, StatisticsState* statState, bool machineMetrics) {
const IPAddress ipAddr = machineState.ip.present() ? machineState.ip.get() : IPAddress();
SystemStatistics currentStats = getSystemStatistics(
@ -284,8 +315,8 @@ SystemStatistics customSystemMonitor(std::string const& eventName, StatisticsSta
}
if (machineMetrics) {
TraceEvent("MachineMetrics")
.detail("Elapsed", currentStats.elapsed)
auto traceEvent = TraceEvent("MachineMetrics");
traceEvent.detail("Elapsed", currentStats.elapsed)
.detail("MbpsSent", currentStats.machineMegabitsSent / currentStats.elapsed)
.detail("MbpsReceived", currentStats.machineMegabitsReceived / currentStats.elapsed)
.detail("OutSegs", currentStats.machineOutSegs)
@ -298,6 +329,11 @@ SystemStatistics customSystemMonitor(std::string const& eventName, StatisticsSta
.detail("ZoneID", machineState.zoneId)
.detail("MachineID", machineState.machineId)
.trackLatest("MachineMetrics");
#ifdef __linux__
for (const auto& [k, v] : linux_os::reportCGroupCpuStat()) {
traceEvent.detail(capitalizeCgroupKey(k).c_str(), v);
}
#endif // __linux__
}
}

View File

@ -143,6 +143,7 @@ inline static T& makeDependent(T& value) {
return value;
}
#include <map>
#include <string>
#include <vector>
@ -244,6 +245,16 @@ double getProcessorTimeThread();
double getProcessorTimeProcess();
#ifdef __linux__
namespace linux_os {
// Collects the /sys/fs/cgroup/cpu,cpuacct/cpu.stat information and returns the content
// For more information about cpu,cpuacct, check manpages for cgroup
std::map<std::string, int64_t> reportCGroupCpuStat();
} // namespace linux_os
#endif // __linux__
uint64_t getMemoryUsage();
uint64_t getResidentMemoryUsage();
@ -316,6 +327,7 @@ void renameFile(std::string const& fromPath, std::string const& toPath);
void atomicReplace(std::string const& path, std::string const& content, bool textmode = true);
// Read a file into memory
// This requires the file to be seekable
std::string readFileBytes(std::string const& filename, int maxSize);
// Read a file into memory supplied by the caller
@ -772,7 +784,7 @@ int64_t getNumProfilesDeferred();
int64_t getNumProfilesOverflowed();
int64_t getNumProfilesCaptured();
#else
#else // __cplusplus
#define EXTERNC
#endif // __cplusplus

View File

@ -20,9 +20,48 @@
#ifndef _FLOW_UTIL_H_
#define _FLOW_UTIL_H_
#pragma once
#include <algorithm>
#include <functional>
#include <iosfwd>
// Read key/value pairs from stream. The stream is constituted by lines of text.
// Each line contains a pair of key/value, separated by space/tab. e.g.
//
// Key1 Value1 tailing characters
// Key2 Value2 tailing characters
//
// The tailing characters will be ignored.
//
// K and V should have
//
// std::istream& operator>>(std::istream&, K&);
//
// implemented.
template <typename K, typename V>
void keyValueReader(std::istream& stream, std::function<bool(const K&, const V&)> consumer) {
std::stringstream lineParser;
std::string line;
K key;
V value;
while (std::getline(stream, line)) {
lineParser.clear();
lineParser.str(std::move(line));
try {
lineParser >> key >> value;
} catch (std::ios_base::failure&) {
continue;
}
if (lineParser.fail() || lineParser.bad()) {
continue;
}
if (!consumer(key, value)) {
break;
}
}
}
template <typename C>
void swapAndPop(C* container, int index) {