forked from mindspore-Ecosystem/mindspore
!6024 Fix CodeDEX issues in DuplexPipe.
Merge pull request !6024 from 张清华/master2
This commit is contained in:
commit
7d3a9d78b7
|
@ -59,7 +59,7 @@ class KernelBuildClient {
|
||||||
// Exception's thrown if open failed
|
// Exception's thrown if open failed
|
||||||
if (dp_->Open({GetEnv(), GetScript()}, true) != -1) {
|
if (dp_->Open({GetEnv(), GetScript()}, true) != -1) {
|
||||||
dp_->SetTimeOutSeconds(kTimeOutSeconds);
|
dp_->SetTimeOutSeconds(kTimeOutSeconds);
|
||||||
dp_->SetTimeOutCallback(std::make_shared<std::function<void()>>([this]() { SendRequest(kFinish); }));
|
dp_->SetTimeOutCallback(std::make_shared<std::function<void()>>([this]() { Close(); }));
|
||||||
dp_->SetFinalizeCallback(std::make_shared<std::function<void()>>([this]() { Close(); }));
|
dp_->SetFinalizeCallback(std::make_shared<std::function<void()>>([this]() { Close(); }));
|
||||||
init_ = true;
|
init_ = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,6 @@ int DuplexPipe::Open(std::initializer_list<std::string> arg_list, bool append_fd
|
||||||
DP_INFO << "Remote process, pid: " << getpid() << ", " << fd1_[0] << "/" << fd2_[1];
|
DP_INFO << "Remote process, pid: " << getpid() << ", " << fd1_[0] << "/" << fd2_[1];
|
||||||
remote_stdout_ = dup(STDOUT_FILENO);
|
remote_stdout_ = dup(STDOUT_FILENO);
|
||||||
remote_stdin_ = dup(STDIN_FILENO);
|
remote_stdin_ = dup(STDIN_FILENO);
|
||||||
remote_stderr_ = dup(STDERR_FILENO);
|
|
||||||
close(fd1_[1]);
|
close(fd1_[1]);
|
||||||
close(fd2_[0]);
|
close(fd2_[0]);
|
||||||
if (!append_fds) {
|
if (!append_fds) {
|
||||||
|
@ -67,7 +66,6 @@ int DuplexPipe::Open(std::initializer_list<std::string> arg_list, bool append_fd
|
||||||
DP_INFO << "Local process, id: " << getpid() << ", " << fd2_[0] << "/" << fd1_[1];
|
DP_INFO << "Local process, id: " << getpid() << ", " << fd2_[0] << "/" << fd1_[1];
|
||||||
local_stdout_ = dup(STDOUT_FILENO);
|
local_stdout_ = dup(STDOUT_FILENO);
|
||||||
local_stdin_ = dup(STDIN_FILENO);
|
local_stdin_ = dup(STDIN_FILENO);
|
||||||
local_stderr_ = dup(STDERR_FILENO);
|
|
||||||
close(fd1_[0]);
|
close(fd1_[0]);
|
||||||
close(fd2_[1]);
|
close(fd2_[1]);
|
||||||
|
|
||||||
|
@ -95,18 +93,21 @@ void DuplexPipe::Write(const std::string &buf, bool flush) {
|
||||||
std::string DuplexPipe::Read() {
|
std::string DuplexPipe::Read() {
|
||||||
// Read the string from pipe
|
// Read the string from pipe
|
||||||
std::string buf;
|
std::string buf;
|
||||||
ssize_t size;
|
|
||||||
// MAYBE BLOCKED
|
|
||||||
// Read one line or multiple lines
|
// Read one line or multiple lines
|
||||||
while (SetTimeOut(), (size = read(fd2_[0], c_buf_, kBufferSize)) > 0) { // Till reading something
|
while (1) {
|
||||||
|
SetTimeOut();
|
||||||
|
ssize_t size = read(fd2_[0], c_buf_, kBufferSize); // MAYBE BLOCKED, Till reading something
|
||||||
|
if (size <= 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
CancelTimeOut();
|
CancelTimeOut();
|
||||||
DP_DEBUG << ">> [" << c_buf_ << "]";
|
|
||||||
bool line_end = c_buf_[size - 1] == '\n';
|
bool line_end = c_buf_[size - 1] == '\n';
|
||||||
buf.append(c_buf_, line_end ? size - 1 : size); // Copy without the last '\n'
|
buf.append(c_buf_, line_end ? size - 1 : size); // Copy without the last '\n'
|
||||||
if (line_end) {
|
if (line_end) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
DP_DEBUG << ">> [" << buf << "]";
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,15 +168,15 @@ void DuplexPipe::SignalHandler::CancelAlarm() { alarm(0); }
|
||||||
|
|
||||||
void DuplexPipe::SignalHandler::SigAlarmHandler(int sig) {
|
void DuplexPipe::SignalHandler::SigAlarmHandler(int sig) {
|
||||||
DP_INFO << "Signal: " << sig << ", child_pid_: " << child_pid_;
|
DP_INFO << "Signal: " << sig << ", child_pid_: " << child_pid_;
|
||||||
if (!dp_.expired()) {
|
if (dp_ != nullptr) {
|
||||||
dp_.lock()->NotifyTimeOut();
|
dp_->NotifyTimeOut();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void DuplexPipe::SignalHandler::SigPipeHandler(int sig) {
|
void DuplexPipe::SignalHandler::SigPipeHandler(int sig) {
|
||||||
DP_INFO << "Signal: " << sig;
|
DP_INFO << "Signal: " << sig << ", child_pid_: " << child_pid_;
|
||||||
if (!dp_.expired()) {
|
if (dp_ != nullptr) {
|
||||||
dp_.lock()->Close();
|
dp_->NotifyFinalize();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,19 +184,11 @@ void DuplexPipe::SignalHandler::SigChildHandler(int sig) {
|
||||||
DP_INFO << "Signal: " << sig << ", child_pid_: " << child_pid_;
|
DP_INFO << "Signal: " << sig << ", child_pid_: " << child_pid_;
|
||||||
int status;
|
int status;
|
||||||
auto pid = waitpid(child_pid_, &status, WNOHANG | WUNTRACED);
|
auto pid = waitpid(child_pid_, &status, WNOHANG | WUNTRACED);
|
||||||
if (WIFEXITED(status)) {
|
if (WIFEXITED(status)) { // Normal exit
|
||||||
DP_INFO << "Child exited, status: " << WEXITSTATUS(status) << ", pid: " << pid << ", dp expired: " << dp_.expired();
|
DP_INFO << "Child exited, status: " << WEXITSTATUS(status) << ", pid: " << pid << ", dp: " << dp_;
|
||||||
if (pid > 0 && !dp_.expired()) {
|
if (pid > 0 && dp_ != nullptr) { // It's child_pid_
|
||||||
dp_.lock()->NotifyFinalize();
|
dp_->NotifyFinalize();
|
||||||
}
|
}
|
||||||
} else if (WIFSTOPPED(status)) {
|
|
||||||
DP_INFO << "Child stopped, sig: " << WSTOPSIG(status) << ", pid: " << pid;
|
|
||||||
} else if (WIFSIGNALED(status)) {
|
|
||||||
DP_INFO << "Child not exited, signaled, sig: " << WTERMSIG(status) << ", pid: " << pid;
|
|
||||||
} else if (WIFCONTINUED(status)) {
|
|
||||||
DP_INFO << "Child continued, pid: " << pid;
|
|
||||||
} else {
|
|
||||||
DP_INFO << "Wrong child status: " << status << ", pid: " << pid;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} // namespace mindspore
|
} // namespace mindspore
|
||||||
|
|
|
@ -60,8 +60,16 @@ class DuplexPipe : public std::enable_shared_from_this<mindspore::DuplexPipe> {
|
||||||
DuplexPipe &operator>>(std::string &buf);
|
DuplexPipe &operator>>(std::string &buf);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void SetTimeOut() { signal_handler_->SetAlarm(time_out_secs_); }
|
void SetTimeOut() {
|
||||||
void CancelTimeOut() { signal_handler_->CancelAlarm(); }
|
if (signal_handler_ != nullptr) {
|
||||||
|
signal_handler_->SetAlarm(time_out_secs_);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
void CancelTimeOut() {
|
||||||
|
if (signal_handler_ != nullptr) {
|
||||||
|
signal_handler_->CancelAlarm();
|
||||||
|
}
|
||||||
|
}
|
||||||
void NotifyTimeOut() {
|
void NotifyTimeOut() {
|
||||||
if (time_out_callback_ != nullptr) {
|
if (time_out_callback_ != nullptr) {
|
||||||
(*time_out_callback_)();
|
(*time_out_callback_)();
|
||||||
|
@ -96,10 +104,8 @@ class DuplexPipe : public std::enable_shared_from_this<mindspore::DuplexPipe> {
|
||||||
|
|
||||||
int local_stdin_;
|
int local_stdin_;
|
||||||
int local_stdout_;
|
int local_stdout_;
|
||||||
int local_stderr_;
|
|
||||||
int remote_stdin_;
|
int remote_stdin_;
|
||||||
int remote_stdout_;
|
int remote_stdout_;
|
||||||
int remote_stderr_;
|
|
||||||
|
|
||||||
class SignalHandler {
|
class SignalHandler {
|
||||||
public:
|
public:
|
||||||
|
@ -114,7 +120,7 @@ class DuplexPipe : public std::enable_shared_from_this<mindspore::DuplexPipe> {
|
||||||
static void SigPipeHandler(int sig);
|
static void SigPipeHandler(int sig);
|
||||||
static void SigChildHandler(int sig);
|
static void SigChildHandler(int sig);
|
||||||
|
|
||||||
inline static std::weak_ptr<DuplexPipe> dp_;
|
inline static std::shared_ptr<DuplexPipe> dp_;
|
||||||
inline static pid_t child_pid_;
|
inline static pid_t child_pid_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue