From acac02587db1d019a20072a8d2a72e9613e313a8 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Fri, 29 Jan 2021 01:31:26 -0800 Subject: [PATCH 1/2] Trace output from forked processes --- fdbserver/FDBExecHelper.actor.cpp | 71 +++++++++++++++++++++++-------- fdbserver/Knobs.cpp | 1 + fdbserver/Knobs.h | 1 + flow/Trace.cpp | 8 ++++ flow/Trace.h | 4 ++ 5 files changed, 67 insertions(+), 18 deletions(-) diff --git a/fdbserver/FDBExecHelper.actor.cpp b/fdbserver/FDBExecHelper.actor.cpp index fe148f1b5a..b4984e901a 100644 --- a/fdbserver/FDBExecHelper.actor.cpp +++ b/fdbserver/FDBExecHelper.actor.cpp @@ -78,20 +78,39 @@ ACTOR Future spawnProcess(std::string binPath, std::vector par } #else -pid_t fork_child(const std::string& path, - std::vector& paramList) -{ +pid_t fork_child(const std::string& path, std::vector& paramList, int& readFD) { + int pipefd[2]; + pipe(pipefd); + readFD = pipefd[0]; + auto writeFD = pipefd[1]; pid_t pid = fork(); if (pid == -1) { + close(readFD); + close(writeFD); return -1; } if (pid == 0) { - execv(const_cast(path.c_str()), ¶mList[0]); + close(readFD); + dup2(writeFD, 1); // stdout + dup2(writeFD, 2); // stderr + close(writeFD); + execv(&path[0], ¶mList[0]); _exit(EXIT_FAILURE); } + close(writeFD); return pid; } +static void setupTraceWithOutput(TraceEvent& event, size_t bytesRead, char* outputBuffer) { + if (bytesRead == 0) return; + ASSERT(bytesRead <= SERVER_KNOBS->MAX_FORKED_PROCESS_OUTPUT); + auto extraBytesNeeded = std::max(bytesRead - event.getMaxFieldLength(), 0); + event.setMaxFieldLength(event.getMaxFieldLength() + extraBytesNeeded); + event.setMaxEventLength(event.getMaxEventLength() + extraBytesNeeded); + outputBuffer[bytesRead - 1] = '\0'; + event.detail("Output", std::string(outputBuffer)); +} + ACTOR Future spawnProcess(std::string path, std::vector args, double maxWaitTime, bool isSync, double maxSimDelayTime) { // for async calls in simulator, always delay by a deterministic amount of time and then @@ -107,16 +126,18 @@ ACTOR Future spawnProcess(std::string path, std::vector args, std::vector paramList; for (int i = 0; i < args.size(); i++) { - paramList.push_back(const_cast(args[i].c_str())); + paramList.push_back(&args[i][0]); } paramList.push_back(nullptr); state std::string allArgs; for (int i = 0; i < args.size(); i++) { + if (i > 0) allArgs += " "; allArgs += args[i]; } - state pid_t pid = fork_child(path, paramList); + state int forkedProcessOutputFD; + state pid_t pid = fork_child(path, paramList, forkedProcessOutputFD); if (pid == -1) { TraceEvent(SevWarnAlways, "SpawnProcess: Command failed to spawn") .detail("Cmd", path) @@ -125,20 +146,32 @@ ACTOR Future spawnProcess(std::string path, std::vector args, } else if (pid > 0) { state int status = -1; state double runTime = 0; + state Arena arena; + state char* outputBuffer = new (arena) char[SERVER_KNOBS->MAX_FORKED_PROCESS_OUTPUT]; + state size_t bytesRead = 0; while (true) { if (runTime > maxWaitTime) { // timing out + TraceEvent(SevWarnAlways, "SpawnProcess : Command failed, timeout") .detail("Cmd", path) .detail("Args", allArgs); return -1; } int err = waitpid(pid, &status, WNOHANG); + loop { + int bytes = read(forkedProcessOutputFD, &outputBuffer[bytesRead], + SERVER_KNOBS->MAX_FORKED_PROCESS_OUTPUT - bytesRead); + bytesRead += bytes; + if (bytes == 0) break; + } + if (err < 0) { - TraceEvent(SevWarnAlways, "SpawnProcess : Command failed") - .detail("Cmd", path) - .detail("Args", allArgs) - .detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : -1); + TraceEvent event(SevWarnAlways, "SpawnProcess : Command failed"); + setupTraceWithOutput(event, bytesRead, outputBuffer); + event.detail("Cmd", path) + .detail("Args", allArgs) + .detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : -1); return -1; } else if (err == 0) { // child process has not completed yet @@ -153,16 +186,18 @@ ACTOR Future spawnProcess(std::string path, std::vector args, } else { // child process completed if (!(WIFEXITED(status) && WEXITSTATUS(status) == 0)) { - TraceEvent(SevWarnAlways, "SpawnProcess : Command failed") - .detail("Cmd", path) - .detail("Args", allArgs) - .detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : -1); + TraceEvent event(SevWarnAlways, "SpawnProcess : Command failed"); + setupTraceWithOutput(event, bytesRead, outputBuffer); + event.detail("Cmd", path) + .detail("Args", allArgs) + .detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : -1); return WIFEXITED(status) ? WEXITSTATUS(status) : -1; } - TraceEvent("SpawnProcess : Command status") - .detail("Cmd", path) - .detail("Args", allArgs) - .detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : 0); + TraceEvent event("SpawnProcess : Command status"); + setupTraceWithOutput(event, bytesRead, outputBuffer); + event.detail("Cmd", path) + .detail("Args", allArgs) + .detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : 0); return 0; } } diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index ec6a0dfe70..370dd2e7d2 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -103,6 +103,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( TLOG_POP_BATCH_SIZE, 1000 ); if ( randomize && BUGGIFY ) TLOG_POP_BATCH_SIZE = 10; // disk snapshot max timeout, to be put in TLog, storage and coordinator nodes + init( MAX_FORKED_PROCESS_OUTPUT, 1024 ); init( SNAP_CREATE_MAX_TIMEOUT, 300.0 ); // Data distribution queue diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 51567f6fee..6432968756 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -464,6 +464,7 @@ public: double STORAGE_SERVER_LIST_FETCH_TIMEOUT; // disk snapshot + int64_t MAX_FORKED_PROCESS_OUTPUT; double SNAP_CREATE_MAX_TIMEOUT; //Storage Metrics diff --git a/flow/Trace.cpp b/flow/Trace.cpp index 2c36604e6b..306a068623 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -1040,6 +1040,10 @@ TraceEvent& TraceEvent::setMaxFieldLength(int maxFieldLength) { return *this; } +int TraceEvent::getMaxFieldLength() const { + return maxFieldLength; +} + TraceEvent& TraceEvent::setMaxEventLength(int maxEventLength) { ASSERT(!logged); if(maxEventLength == 0) { @@ -1052,6 +1056,10 @@ TraceEvent& TraceEvent::setMaxEventLength(int maxEventLength) { return *this; } +int TraceEvent::getMaxEventLength() const { + return maxEventLength; +} + TraceEvent& TraceEvent::GetLastError() { #ifdef _WIN32 return detailf("WinErrorCode", "%x", ::GetLastError()); diff --git a/flow/Trace.h b/flow/Trace.h index 5137d99a68..e6b1f04268 100644 --- a/flow/Trace.h +++ b/flow/Trace.h @@ -474,10 +474,14 @@ public: // changed multiple times in a single event. TraceEvent& setMaxFieldLength(int maxFieldLength); + int getMaxFieldLength() const; + // Sets the maximum event length before the event gets suppressed and a warning is logged. A value of 0 uses the default, // a negative value disables length suppression. This should be called before adding details. TraceEvent& setMaxEventLength(int maxEventLength); + int getMaxEventLength() const; + //Cannot call other functions which could disable the trace event afterwords TraceEvent& suppressFor( double duration, bool logSuppressedEventCount=true ); From f9aba7064d2bf7baa0faa698e2a940bf57c7b2e3 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Fri, 29 Jan 2021 16:27:48 -0800 Subject: [PATCH 2/2] Use consistent return method for fork_child --- fdbserver/FDBExecHelper.actor.cpp | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/fdbserver/FDBExecHelper.actor.cpp b/fdbserver/FDBExecHelper.actor.cpp index b4984e901a..e17ee8f785 100644 --- a/fdbserver/FDBExecHelper.actor.cpp +++ b/fdbserver/FDBExecHelper.actor.cpp @@ -78,16 +78,16 @@ ACTOR Future spawnProcess(std::string binPath, std::vector par } #else -pid_t fork_child(const std::string& path, std::vector& paramList, int& readFD) { +static auto fork_child(const std::string& path, std::vector& paramList) { int pipefd[2]; pipe(pipefd); - readFD = pipefd[0]; + auto readFD = pipefd[0]; auto writeFD = pipefd[1]; pid_t pid = fork(); if (pid == -1) { close(readFD); close(writeFD); - return -1; + return std::make_pair(-1, Optional{}); } if (pid == 0) { close(readFD); @@ -98,7 +98,7 @@ pid_t fork_child(const std::string& path, std::vector& paramList, int& re _exit(EXIT_FAILURE); } close(writeFD); - return pid; + return std::make_pair(pid, Optional{ readFD }); } static void setupTraceWithOutput(TraceEvent& event, size_t bytesRead, char* outputBuffer) { @@ -136,8 +136,9 @@ ACTOR Future spawnProcess(std::string path, std::vector args, allArgs += args[i]; } - state int forkedProcessOutputFD; - state pid_t pid = fork_child(path, paramList, forkedProcessOutputFD); + state std::pair> pidAndReadFD = fork_child(path, paramList); + state pid_t pid = pidAndReadFD.first; + state Optional readFD = pidAndReadFD.second; if (pid == -1) { TraceEvent(SevWarnAlways, "SpawnProcess: Command failed to spawn") .detail("Cmd", path) @@ -160,8 +161,8 @@ ACTOR Future spawnProcess(std::string path, std::vector args, } int err = waitpid(pid, &status, WNOHANG); loop { - int bytes = read(forkedProcessOutputFD, &outputBuffer[bytesRead], - SERVER_KNOBS->MAX_FORKED_PROCESS_OUTPUT - bytesRead); + int bytes = + read(readFD.get(), &outputBuffer[bytesRead], SERVER_KNOBS->MAX_FORKED_PROCESS_OUTPUT - bytesRead); bytesRead += bytes; if (bytes == 0) break; }