diff --git a/fdbserver/FDBExecHelper.actor.cpp b/fdbserver/FDBExecHelper.actor.cpp index fe148f1b5a..e17ee8f785 100644 --- a/fdbserver/FDBExecHelper.actor.cpp +++ b/fdbserver/FDBExecHelper.actor.cpp @@ -78,18 +78,37 @@ ACTOR Future spawnProcess(std::string binPath, std::vector par } #else -pid_t fork_child(const std::string& path, - std::vector& paramList) -{ +static auto fork_child(const std::string& path, std::vector& paramList) { + int pipefd[2]; + pipe(pipefd); + auto readFD = pipefd[0]; + auto writeFD = pipefd[1]; pid_t pid = fork(); if (pid == -1) { - return -1; + close(readFD); + close(writeFD); + return std::make_pair(-1, Optional{}); } if (pid == 0) { - execv(const_cast(path.c_str()), ¶mList[0]); + close(readFD); + dup2(writeFD, 1); // stdout + dup2(writeFD, 2); // stderr + close(writeFD); + execv(&path[0], ¶mList[0]); _exit(EXIT_FAILURE); } - return pid; + close(writeFD); + return std::make_pair(pid, Optional{ readFD }); +} + +static void setupTraceWithOutput(TraceEvent& event, size_t bytesRead, char* outputBuffer) { + if (bytesRead == 0) return; + ASSERT(bytesRead <= SERVER_KNOBS->MAX_FORKED_PROCESS_OUTPUT); + auto extraBytesNeeded = std::max(bytesRead - event.getMaxFieldLength(), 0); + event.setMaxFieldLength(event.getMaxFieldLength() + extraBytesNeeded); + event.setMaxEventLength(event.getMaxEventLength() + extraBytesNeeded); + outputBuffer[bytesRead - 1] = '\0'; + event.detail("Output", std::string(outputBuffer)); } ACTOR Future spawnProcess(std::string path, std::vector args, double maxWaitTime, bool isSync, double maxSimDelayTime) @@ -107,16 +126,19 @@ ACTOR Future spawnProcess(std::string path, std::vector args, std::vector paramList; for (int i = 0; i < args.size(); i++) { - paramList.push_back(const_cast(args[i].c_str())); + paramList.push_back(&args[i][0]); } paramList.push_back(nullptr); state std::string allArgs; for (int i = 0; i < args.size(); i++) { + if (i > 0) allArgs += " "; allArgs += args[i]; } - state pid_t pid = fork_child(path, paramList); + state std::pair> pidAndReadFD = fork_child(path, paramList); + state pid_t pid = pidAndReadFD.first; + state Optional readFD = pidAndReadFD.second; if (pid == -1) { TraceEvent(SevWarnAlways, "SpawnProcess: Command failed to spawn") .detail("Cmd", path) @@ -125,20 +147,32 @@ ACTOR Future spawnProcess(std::string path, std::vector args, } else if (pid > 0) { state int status = -1; state double runTime = 0; + state Arena arena; + state char* outputBuffer = new (arena) char[SERVER_KNOBS->MAX_FORKED_PROCESS_OUTPUT]; + state size_t bytesRead = 0; while (true) { if (runTime > maxWaitTime) { // timing out + TraceEvent(SevWarnAlways, "SpawnProcess : Command failed, timeout") .detail("Cmd", path) .detail("Args", allArgs); return -1; } int err = waitpid(pid, &status, WNOHANG); + loop { + int bytes = + read(readFD.get(), &outputBuffer[bytesRead], SERVER_KNOBS->MAX_FORKED_PROCESS_OUTPUT - bytesRead); + bytesRead += bytes; + if (bytes == 0) break; + } + if (err < 0) { - TraceEvent(SevWarnAlways, "SpawnProcess : Command failed") - .detail("Cmd", path) - .detail("Args", allArgs) - .detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : -1); + TraceEvent event(SevWarnAlways, "SpawnProcess : Command failed"); + setupTraceWithOutput(event, bytesRead, outputBuffer); + event.detail("Cmd", path) + .detail("Args", allArgs) + .detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : -1); return -1; } else if (err == 0) { // child process has not completed yet @@ -153,16 +187,18 @@ ACTOR Future spawnProcess(std::string path, std::vector args, } else { // child process completed if (!(WIFEXITED(status) && WEXITSTATUS(status) == 0)) { - TraceEvent(SevWarnAlways, "SpawnProcess : Command failed") - .detail("Cmd", path) - .detail("Args", allArgs) - .detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : -1); + TraceEvent event(SevWarnAlways, "SpawnProcess : Command failed"); + setupTraceWithOutput(event, bytesRead, outputBuffer); + event.detail("Cmd", path) + .detail("Args", allArgs) + .detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : -1); return WIFEXITED(status) ? WEXITSTATUS(status) : -1; } - TraceEvent("SpawnProcess : Command status") - .detail("Cmd", path) - .detail("Args", allArgs) - .detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : 0); + TraceEvent event("SpawnProcess : Command status"); + setupTraceWithOutput(event, bytesRead, outputBuffer); + event.detail("Cmd", path) + .detail("Args", allArgs) + .detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : 0); return 0; } } diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index a995271645..350c8a72cc 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -104,6 +104,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( TLOG_POP_BATCH_SIZE, 1000 ); if ( randomize && BUGGIFY ) TLOG_POP_BATCH_SIZE = 10; // disk snapshot max timeout, to be put in TLog, storage and coordinator nodes + init( MAX_FORKED_PROCESS_OUTPUT, 1024 ); init( SNAP_CREATE_MAX_TIMEOUT, 300.0 ); // Data distribution queue diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 58eb105205..e1a0beae27 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -475,6 +475,7 @@ public: double STORAGE_SERVER_LIST_FETCH_TIMEOUT; // disk snapshot + int64_t MAX_FORKED_PROCESS_OUTPUT; double SNAP_CREATE_MAX_TIMEOUT; //Storage Metrics diff --git a/flow/Trace.cpp b/flow/Trace.cpp index 1e30ea570d..8f7b666039 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -1040,6 +1040,10 @@ TraceEvent& TraceEvent::setMaxFieldLength(int maxFieldLength) { return *this; } +int TraceEvent::getMaxFieldLength() const { + return maxFieldLength; +} + TraceEvent& TraceEvent::setMaxEventLength(int maxEventLength) { ASSERT(!logged); if(maxEventLength == 0) { @@ -1052,6 +1056,10 @@ TraceEvent& TraceEvent::setMaxEventLength(int maxEventLength) { return *this; } +int TraceEvent::getMaxEventLength() const { + return maxEventLength; +} + TraceEvent& TraceEvent::GetLastError() { #ifdef _WIN32 return detailf("WinErrorCode", "%x", ::GetLastError()); diff --git a/flow/Trace.h b/flow/Trace.h index 3dde5cc9ac..d9af41122d 100644 --- a/flow/Trace.h +++ b/flow/Trace.h @@ -473,10 +473,14 @@ public: // changed multiple times in a single event. TraceEvent& setMaxFieldLength(int maxFieldLength); + int getMaxFieldLength() const; + // Sets the maximum event length before the event gets suppressed and a warning is logged. A value of 0 uses the default, // a negative value disables length suppression. This should be called before adding details. TraceEvent& setMaxEventLength(int maxEventLength); + int getMaxEventLength() const; + //Cannot call other functions which could disable the trace event afterwords TraceEvent& suppressFor( double duration, bool logSuppressedEventCount=true );