Merge pull request #4254 from sfc-gh-tclinkenbeard/capture-forked-process-output
Capture forked process output in traces
This commit is contained in:
commit
2d84e5e1f1
|
@ -78,18 +78,37 @@ ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> par
|
|||
}
|
||||
#else
|
||||
|
||||
pid_t fork_child(const std::string& path,
|
||||
std::vector<char*>& paramList)
|
||||
{
|
||||
static auto fork_child(const std::string& path, std::vector<char*>& paramList) {
|
||||
int pipefd[2];
|
||||
pipe(pipefd);
|
||||
auto readFD = pipefd[0];
|
||||
auto writeFD = pipefd[1];
|
||||
pid_t pid = fork();
|
||||
if (pid == -1) {
|
||||
return -1;
|
||||
close(readFD);
|
||||
close(writeFD);
|
||||
return std::make_pair(-1, Optional<int>{});
|
||||
}
|
||||
if (pid == 0) {
|
||||
execv(const_cast<char*>(path.c_str()), ¶mList[0]);
|
||||
close(readFD);
|
||||
dup2(writeFD, 1); // stdout
|
||||
dup2(writeFD, 2); // stderr
|
||||
close(writeFD);
|
||||
execv(&path[0], ¶mList[0]);
|
||||
_exit(EXIT_FAILURE);
|
||||
}
|
||||
return pid;
|
||||
close(writeFD);
|
||||
return std::make_pair(pid, Optional<int>{ readFD });
|
||||
}
|
||||
|
||||
static void setupTraceWithOutput(TraceEvent& event, size_t bytesRead, char* outputBuffer) {
|
||||
if (bytesRead == 0) return;
|
||||
ASSERT(bytesRead <= SERVER_KNOBS->MAX_FORKED_PROCESS_OUTPUT);
|
||||
auto extraBytesNeeded = std::max<int>(bytesRead - event.getMaxFieldLength(), 0);
|
||||
event.setMaxFieldLength(event.getMaxFieldLength() + extraBytesNeeded);
|
||||
event.setMaxEventLength(event.getMaxEventLength() + extraBytesNeeded);
|
||||
outputBuffer[bytesRead - 1] = '\0';
|
||||
event.detail("Output", std::string(outputBuffer));
|
||||
}
|
||||
|
||||
ACTOR Future<int> spawnProcess(std::string path, std::vector<std::string> args, double maxWaitTime, bool isSync, double maxSimDelayTime)
|
||||
|
@ -107,16 +126,19 @@ ACTOR Future<int> spawnProcess(std::string path, std::vector<std::string> args,
|
|||
|
||||
std::vector<char*> paramList;
|
||||
for (int i = 0; i < args.size(); i++) {
|
||||
paramList.push_back(const_cast<char*>(args[i].c_str()));
|
||||
paramList.push_back(&args[i][0]);
|
||||
}
|
||||
paramList.push_back(nullptr);
|
||||
|
||||
state std::string allArgs;
|
||||
for (int i = 0; i < args.size(); i++) {
|
||||
if (i > 0) allArgs += " ";
|
||||
allArgs += args[i];
|
||||
}
|
||||
|
||||
state pid_t pid = fork_child(path, paramList);
|
||||
state std::pair<pid_t, Optional<int>> pidAndReadFD = fork_child(path, paramList);
|
||||
state pid_t pid = pidAndReadFD.first;
|
||||
state Optional<int> readFD = pidAndReadFD.second;
|
||||
if (pid == -1) {
|
||||
TraceEvent(SevWarnAlways, "SpawnProcess: Command failed to spawn")
|
||||
.detail("Cmd", path)
|
||||
|
@ -125,20 +147,32 @@ ACTOR Future<int> spawnProcess(std::string path, std::vector<std::string> args,
|
|||
} else if (pid > 0) {
|
||||
state int status = -1;
|
||||
state double runTime = 0;
|
||||
state Arena arena;
|
||||
state char* outputBuffer = new (arena) char[SERVER_KNOBS->MAX_FORKED_PROCESS_OUTPUT];
|
||||
state size_t bytesRead = 0;
|
||||
while (true) {
|
||||
if (runTime > maxWaitTime) {
|
||||
// timing out
|
||||
|
||||
TraceEvent(SevWarnAlways, "SpawnProcess : Command failed, timeout")
|
||||
.detail("Cmd", path)
|
||||
.detail("Args", allArgs);
|
||||
return -1;
|
||||
}
|
||||
int err = waitpid(pid, &status, WNOHANG);
|
||||
loop {
|
||||
int bytes =
|
||||
read(readFD.get(), &outputBuffer[bytesRead], SERVER_KNOBS->MAX_FORKED_PROCESS_OUTPUT - bytesRead);
|
||||
bytesRead += bytes;
|
||||
if (bytes == 0) break;
|
||||
}
|
||||
|
||||
if (err < 0) {
|
||||
TraceEvent(SevWarnAlways, "SpawnProcess : Command failed")
|
||||
.detail("Cmd", path)
|
||||
.detail("Args", allArgs)
|
||||
.detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : -1);
|
||||
TraceEvent event(SevWarnAlways, "SpawnProcess : Command failed");
|
||||
setupTraceWithOutput(event, bytesRead, outputBuffer);
|
||||
event.detail("Cmd", path)
|
||||
.detail("Args", allArgs)
|
||||
.detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : -1);
|
||||
return -1;
|
||||
} else if (err == 0) {
|
||||
// child process has not completed yet
|
||||
|
@ -153,16 +187,18 @@ ACTOR Future<int> spawnProcess(std::string path, std::vector<std::string> args,
|
|||
} else {
|
||||
// child process completed
|
||||
if (!(WIFEXITED(status) && WEXITSTATUS(status) == 0)) {
|
||||
TraceEvent(SevWarnAlways, "SpawnProcess : Command failed")
|
||||
.detail("Cmd", path)
|
||||
.detail("Args", allArgs)
|
||||
.detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : -1);
|
||||
TraceEvent event(SevWarnAlways, "SpawnProcess : Command failed");
|
||||
setupTraceWithOutput(event, bytesRead, outputBuffer);
|
||||
event.detail("Cmd", path)
|
||||
.detail("Args", allArgs)
|
||||
.detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : -1);
|
||||
return WIFEXITED(status) ? WEXITSTATUS(status) : -1;
|
||||
}
|
||||
TraceEvent("SpawnProcess : Command status")
|
||||
.detail("Cmd", path)
|
||||
.detail("Args", allArgs)
|
||||
.detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : 0);
|
||||
TraceEvent event("SpawnProcess : Command status");
|
||||
setupTraceWithOutput(event, bytesRead, outputBuffer);
|
||||
event.detail("Cmd", path)
|
||||
.detail("Args", allArgs)
|
||||
.detail("Errno", WIFEXITED(status) ? WEXITSTATUS(status) : 0);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -104,6 +104,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( TLOG_POP_BATCH_SIZE, 1000 ); if ( randomize && BUGGIFY ) TLOG_POP_BATCH_SIZE = 10;
|
||||
|
||||
// disk snapshot max timeout, to be put in TLog, storage and coordinator nodes
|
||||
init( MAX_FORKED_PROCESS_OUTPUT, 1024 );
|
||||
init( SNAP_CREATE_MAX_TIMEOUT, 300.0 );
|
||||
|
||||
// Data distribution queue
|
||||
|
|
|
@ -475,6 +475,7 @@ public:
|
|||
double STORAGE_SERVER_LIST_FETCH_TIMEOUT;
|
||||
|
||||
// disk snapshot
|
||||
int64_t MAX_FORKED_PROCESS_OUTPUT;
|
||||
double SNAP_CREATE_MAX_TIMEOUT;
|
||||
|
||||
//Storage Metrics
|
||||
|
|
|
@ -1040,6 +1040,10 @@ TraceEvent& TraceEvent::setMaxFieldLength(int maxFieldLength) {
|
|||
return *this;
|
||||
}
|
||||
|
||||
int TraceEvent::getMaxFieldLength() const {
|
||||
return maxFieldLength;
|
||||
}
|
||||
|
||||
TraceEvent& TraceEvent::setMaxEventLength(int maxEventLength) {
|
||||
ASSERT(!logged);
|
||||
if(maxEventLength == 0) {
|
||||
|
@ -1052,6 +1056,10 @@ TraceEvent& TraceEvent::setMaxEventLength(int maxEventLength) {
|
|||
return *this;
|
||||
}
|
||||
|
||||
int TraceEvent::getMaxEventLength() const {
|
||||
return maxEventLength;
|
||||
}
|
||||
|
||||
TraceEvent& TraceEvent::GetLastError() {
|
||||
#ifdef _WIN32
|
||||
return detailf("WinErrorCode", "%x", ::GetLastError());
|
||||
|
|
|
@ -473,10 +473,14 @@ public:
|
|||
// changed multiple times in a single event.
|
||||
TraceEvent& setMaxFieldLength(int maxFieldLength);
|
||||
|
||||
int getMaxFieldLength() const;
|
||||
|
||||
// Sets the maximum event length before the event gets suppressed and a warning is logged. A value of 0 uses the default,
|
||||
// a negative value disables length suppression. This should be called before adding details.
|
||||
TraceEvent& setMaxEventLength(int maxEventLength);
|
||||
|
||||
int getMaxEventLength() const;
|
||||
|
||||
//Cannot call other functions which could disable the trace event afterwords
|
||||
TraceEvent& suppressFor( double duration, bool logSuppressedEventCount=true );
|
||||
|
||||
|
|
Loading…
Reference in New Issue