Fix the defunct process issue.
When run multiple processes by 'mpirun', parent process never quit even Exception happens, which caused by MPI_Finalize() never returned.
This commit is contained in:
parent
ddd9121968
commit
be1c49ee79
|
@ -16,7 +16,7 @@
|
|||
|
||||
#include "common/duplex_pipe.h"
|
||||
|
||||
#include <signal.h>
|
||||
#include <sys/wait.h>
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
|
@ -70,6 +70,8 @@ int DuplexPipe::Open(std::initializer_list<std::string> arg_list, bool append_fd
|
|||
local_stderr_ = dup(STDERR_FILENO);
|
||||
close(fd1_[0]);
|
||||
close(fd2_[1]);
|
||||
|
||||
signal_handler_ = std::make_shared<SignalHandler>(shared_from_this(), pid_);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -147,14 +149,58 @@ void DuplexPipe::Close() {
|
|||
close(fd2_[1]);
|
||||
}
|
||||
|
||||
void DuplexPipe::Alarm::Set(std::shared_ptr<DuplexPipe> dp, unsigned int interval_secs) {
|
||||
DuplexPipe::SignalHandler::SignalHandler(std::shared_ptr<DuplexPipe> dp, pid_t pid) {
|
||||
dp_ = dp;
|
||||
signal(SIGALRM, SigHandler);
|
||||
child_pid_ = pid;
|
||||
signal(SIGCHLD, SigChildHandler);
|
||||
signal(SIGPIPE, SigPipeHandler);
|
||||
}
|
||||
|
||||
DuplexPipe::SignalHandler::~SignalHandler() { dp_.reset(); }
|
||||
|
||||
void DuplexPipe::SignalHandler::SetAlarm(unsigned int interval_secs) {
|
||||
signal(SIGALRM, SigAlarmHandler);
|
||||
alarm(interval_secs);
|
||||
}
|
||||
|
||||
void DuplexPipe::Alarm::Cancel() {
|
||||
alarm(0);
|
||||
dp_.reset();
|
||||
void DuplexPipe::SignalHandler::CancelAlarm() { alarm(0); }
|
||||
|
||||
void DuplexPipe::SignalHandler::SigAlarmHandler(int sig) {
|
||||
DP_INFO << "Signal: " << sig << ", child_pid_: " << child_pid_;
|
||||
if (!dp_.expired()) {
|
||||
dp_.lock()->TimeOut();
|
||||
}
|
||||
}
|
||||
|
||||
void DuplexPipe::SignalHandler::SigPipeHandler(int sig) {
|
||||
DP_INFO << "Signal: " << sig;
|
||||
if (!dp_.expired()) {
|
||||
dp_.lock()->Close();
|
||||
}
|
||||
}
|
||||
|
||||
void DuplexPipe::SignalHandler::SigChildHandler(int sig) {
|
||||
DP_INFO << "Signal: " << sig << ", child_pid_: " << child_pid_;
|
||||
int status;
|
||||
auto pid = waitpid(child_pid_, &status, WNOHANG | WUNTRACED);
|
||||
if (WIFEXITED(status)) {
|
||||
DP_ERROR << "Child exited, status: " << WEXITSTATUS(status) << ", pid: " << pid;
|
||||
if (!dp_.expired()) {
|
||||
dp_.lock()->Close();
|
||||
}
|
||||
|
||||
// When run multiple processes by 'mpirun',
|
||||
// parent process never quit even Exception happens,
|
||||
// which caused by MPI_Finalize() never returned.
|
||||
exit(-1);
|
||||
} else if (WIFSTOPPED(status)) {
|
||||
DP_ERROR << "Child stopped, sig: " << WSTOPSIG(status) << ", pid: " << pid;
|
||||
} else if (WIFSIGNALED(status)) {
|
||||
DP_INFO << "Child not exited, signaled, sig: " << WTERMSIG(status) << ", pid: " << pid;
|
||||
} else if (WIFCONTINUED(status)) {
|
||||
DP_INFO << "Child continued, pid: " << pid;
|
||||
} else {
|
||||
DP_ERROR << "Wrong child status: " << status << ", pid: " << pid;
|
||||
}
|
||||
}
|
||||
} // namespace mindspore
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#define MINDSPORE_CCSRC_COMMON_DUPLEX_PIPE_H_
|
||||
|
||||
#include <unistd.h>
|
||||
#include <signal.h>
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include <initializer_list>
|
||||
|
@ -61,8 +62,8 @@ class DuplexPipe : public std::enable_shared_from_this<mindspore::DuplexPipe> {
|
|||
DuplexPipe &operator>>(std::string &buf);
|
||||
|
||||
private:
|
||||
void SetTimeOut() { alarm_.Set(shared_from_this(), time_out_secs_); }
|
||||
void CancelTimeOut() { alarm_.Cancel(); }
|
||||
void SetTimeOut() { signal_handler_->SetAlarm(time_out_secs_); }
|
||||
void CancelTimeOut() { signal_handler_->CancelAlarm(); }
|
||||
void TimeOut() {
|
||||
if (has_time_out_callback_) {
|
||||
time_out_callback_();
|
||||
|
@ -96,27 +97,27 @@ class DuplexPipe : public std::enable_shared_from_this<mindspore::DuplexPipe> {
|
|||
int remote_stdout_;
|
||||
int remote_stderr_;
|
||||
|
||||
class Alarm {
|
||||
class SignalHandler {
|
||||
public:
|
||||
Alarm() = default;
|
||||
~Alarm() = default;
|
||||
SignalHandler(std::shared_ptr<DuplexPipe> dp, pid_t pid);
|
||||
~SignalHandler();
|
||||
|
||||
void Set(std::shared_ptr<DuplexPipe> dp, unsigned int interval_secs);
|
||||
void Cancel();
|
||||
void SetAlarm(unsigned int interval_secs);
|
||||
void CancelAlarm();
|
||||
|
||||
private:
|
||||
static void SigHandler(int sig) {
|
||||
DP_INFO << "Signal: " << sig;
|
||||
dp_->TimeOut();
|
||||
}
|
||||
static void SigAlarmHandler(int sig);
|
||||
static void SigPipeHandler(int sig);
|
||||
static void SigChildHandler(int sig);
|
||||
|
||||
inline static std::shared_ptr<DuplexPipe> dp_;
|
||||
inline static std::weak_ptr<DuplexPipe> dp_;
|
||||
inline static pid_t child_pid_;
|
||||
};
|
||||
|
||||
unsigned int time_out_secs_ = kTimeOutSeconds;
|
||||
bool has_time_out_callback_ = false;
|
||||
std::function<void()> time_out_callback_;
|
||||
Alarm alarm_;
|
||||
std::shared_ptr<SignalHandler> signal_handler_;
|
||||
};
|
||||
} // namespace mindspore
|
||||
|
||||
|
|
|
@ -40,9 +40,9 @@ DuplexPipe &DuplexPipe::operator>>(std::string &buf) { DP_EXCEPTION << "Not supp
|
|||
|
||||
void DuplexPipe::Close() { DP_EXCEPTION << "Not support for Windows by now."; }
|
||||
|
||||
void DuplexPipe::Alarm::Set(std::shared_ptr<DuplexPipe> dp, unsigned int interval_secs) {
|
||||
void DuplexPipe::SignalHandler::SetAlarm(unsigned int interval_secs) {
|
||||
DP_EXCEPTION << "Not support for Windows by now.";
|
||||
}
|
||||
|
||||
void DuplexPipe::Alarm::Cancel() { DP_EXCEPTION << "Not support for Windows by now."; }
|
||||
void DuplexPipe::SignalHandler::CancelAlarm() { DP_EXCEPTION << "Not support for Windows by now."; }
|
||||
} // namespace mindspore
|
||||
|
|
Loading…
Reference in New Issue