Balance available memory based on the limits set for each process. Don't report more available memory than the limit.

This commit is contained in:
A.J. Beamon 2020-05-14 15:49:59 -07:00
parent 6a75ac4566
commit b49eb0f67a
1 changed files with 16 additions and 13 deletions

View File

@ -377,9 +377,9 @@ JsonBuilderObject getLagObject(int64_t versions) {
struct MachineMemoryInfo { struct MachineMemoryInfo {
double memoryUsage; double memoryUsage;
double numProcesses; double aggregateLimit;
MachineMemoryInfo() : memoryUsage(0), numProcesses(0) {} MachineMemoryInfo() : memoryUsage(0), aggregateLimit(0) {}
bool valid() { return memoryUsage >= 0; } bool valid() { return memoryUsage >= 0; }
void invalidate() { memoryUsage = -1; } void invalidate() { memoryUsage = -1; }
@ -590,11 +590,12 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
try { try {
ASSERT(pMetrics.count(workerItr->interf.address())); ASSERT(pMetrics.count(workerItr->interf.address()));
const TraceEventFields& processMetrics = pMetrics[workerItr->interf.address()]; const TraceEventFields& processMetrics = pMetrics[workerItr->interf.address()];
const TraceEventFields& programStart = programStarts[workerItr->interf.address()];
if(memInfo->second.valid()) { if(memInfo->second.valid()) {
if(processMetrics.size() > 0) { if(processMetrics.size() > 0 && programStart.size() > 0) {
memInfo->second.memoryUsage += processMetrics.getDouble("Memory"); memInfo->second.memoryUsage += processMetrics.getDouble("Memory");
++memInfo->second.numProcesses; memInfo->second.aggregateLimit += programStart.getDouble("MemoryLimit");
} }
else else
memInfo->second.invalidate(); memInfo->second.invalidate();
@ -766,19 +767,21 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
memoryObj.setKeyRawNumber("unused_allocated_memory", processMetrics.getValue("UnusedAllocatedMemory")); memoryObj.setKeyRawNumber("unused_allocated_memory", processMetrics.getValue("UnusedAllocatedMemory"));
} }
int64_t memoryLimit = 0;
if (programStarts.count(address)) { if (programStarts.count(address)) {
auto const& psxml = programStarts.at(address); auto const& programStartEvent = programStarts.at(address);
if(psxml.size() > 0) { if(programStartEvent.size() > 0) {
memoryObj.setKeyRawNumber("limit_bytes",psxml.getValue("MemoryLimit")); memoryLimit = programStartEvent.getInt64("MemoryLimit");
memoryObj.setKey("limit_bytes", memoryLimit);
std::string version; std::string version;
if (psxml.tryGetValue("Version", version)) { if (programStartEvent.tryGetValue("Version", version)) {
statusObj["version"] = version; statusObj["version"] = version;
} }
std::string commandLine; std::string commandLine;
if (psxml.tryGetValue("CommandLine", commandLine)) { if (programStartEvent.tryGetValue("CommandLine", commandLine)) {
statusObj["command_line"] = commandLine; statusObj["command_line"] = commandLine;
} }
} }
@ -790,10 +793,10 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
availableMemory = mMetrics[address].getDouble("AvailableMemory"); availableMemory = mMetrics[address].getDouble("AvailableMemory");
auto machineMemInfo = machineMemoryUsage[workerItr->interf.locality.machineId()]; auto machineMemInfo = machineMemoryUsage[workerItr->interf.locality.machineId()];
if (machineMemInfo.valid()) { if (machineMemInfo.valid() && memoryLimit > 0) {
ASSERT(machineMemInfo.numProcesses > 0); ASSERT(machineMemInfo.aggregateLimit > 0);
int64_t memory = (availableMemory + machineMemInfo.memoryUsage) / machineMemInfo.numProcesses; int64_t memory = (availableMemory + machineMemInfo.memoryUsage) * memoryLimit / machineMemInfo.aggregateLimit;
memoryObj["available_bytes"] = std::max<int64_t>(memory, 0); memoryObj["available_bytes"] = std::min<int64_t>(std::max<int64_t>(memory, 0), memoryLimit);
} }
} }