Merge pull request #3174 from ajbeamon/process-available-memory-balancing

Balance available memory based on the limits set for each process.
This commit is contained in:
Evan Tschannen 2020-05-20 14:20:11 -07:00 committed by GitHub
commit 87350e1bf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 16 additions and 13 deletions

View File

@ -377,9 +377,9 @@ JsonBuilderObject getLagObject(int64_t versions) {
struct MachineMemoryInfo {
double memoryUsage;
double numProcesses;
double aggregateLimit;
MachineMemoryInfo() : memoryUsage(0), numProcesses(0) {}
MachineMemoryInfo() : memoryUsage(0), aggregateLimit(0) {}
bool valid() { return memoryUsage >= 0; }
void invalidate() { memoryUsage = -1; }
@ -613,11 +613,12 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
try {
ASSERT(pMetrics.count(workerItr->interf.address()));
const TraceEventFields& processMetrics = pMetrics[workerItr->interf.address()];
const TraceEventFields& programStart = programStarts[workerItr->interf.address()];
if(memInfo->second.valid()) {
if(processMetrics.size() > 0) {
if(processMetrics.size() > 0 && programStart.size() > 0) {
memInfo->second.memoryUsage += processMetrics.getDouble("Memory");
++memInfo->second.numProcesses;
memInfo->second.aggregateLimit += programStart.getDouble("MemoryLimit");
}
else
memInfo->second.invalidate();
@ -789,19 +790,21 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
memoryObj.setKeyRawNumber("unused_allocated_memory", processMetrics.getValue("UnusedAllocatedMemory"));
}
int64_t memoryLimit = 0;
if (programStarts.count(address)) {
auto const& psxml = programStarts.at(address);
auto const& programStartEvent = programStarts.at(address);
if(psxml.size() > 0) {
memoryObj.setKeyRawNumber("limit_bytes",psxml.getValue("MemoryLimit"));
if(programStartEvent.size() > 0) {
memoryLimit = programStartEvent.getInt64("MemoryLimit");
memoryObj.setKey("limit_bytes", memoryLimit);
std::string version;
if (psxml.tryGetValue("Version", version)) {
if (programStartEvent.tryGetValue("Version", version)) {
statusObj["version"] = version;
}
std::string commandLine;
if (psxml.tryGetValue("CommandLine", commandLine)) {
if (programStartEvent.tryGetValue("CommandLine", commandLine)) {
statusObj["command_line"] = commandLine;
}
}
@ -813,10 +816,10 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
availableMemory = mMetrics[address].getDouble("AvailableMemory");
auto machineMemInfo = machineMemoryUsage[workerItr->interf.locality.machineId()];
if (machineMemInfo.valid()) {
ASSERT(machineMemInfo.numProcesses > 0);
int64_t memory = (availableMemory + machineMemInfo.memoryUsage) / machineMemInfo.numProcesses;
memoryObj["available_bytes"] = std::max<int64_t>(memory, 0);
if (machineMemInfo.valid() && memoryLimit > 0) {
ASSERT(machineMemInfo.aggregateLimit > 0);
int64_t memory = (availableMemory + machineMemInfo.memoryUsage) * memoryLimit / machineMemInfo.aggregateLimit;
memoryObj["available_bytes"] = std::min<int64_t>(std::max<int64_t>(memory, 0), memoryLimit);
}
}