!31921 Fix LF coredump while start timeout
Merge pull request !31921 from zhangzhaoju/master_fl_dts
This commit is contained in:
commit
f90e5c855e
|
@ -22,6 +22,23 @@
|
|||
namespace mindspore {
|
||||
namespace ps {
|
||||
namespace core {
|
||||
AbstractNode::~AbstractNode() {
|
||||
if (client_to_scheduler_ != nullptr) {
|
||||
client_to_scheduler_->Stop();
|
||||
}
|
||||
if (client_to_scheduler_thread_ != nullptr && client_to_scheduler_thread_->joinable()) {
|
||||
client_to_scheduler_thread_->join();
|
||||
}
|
||||
if (heart_beat_thread_ != nullptr && heart_beat_thread_->joinable()) {
|
||||
heart_beat_thread_->join();
|
||||
}
|
||||
if (server_ != nullptr) {
|
||||
server_->Stop();
|
||||
}
|
||||
if (server_thread_ != nullptr && server_thread_->joinable()) {
|
||||
server_thread_->join();
|
||||
}
|
||||
}
|
||||
void AbstractNode::Register(const std::shared_ptr<TcpClient> &client) {
|
||||
MS_EXCEPTION_IF_NULL(client);
|
||||
auto message_meta = std::make_shared<MessageMeta>();
|
||||
|
@ -661,7 +678,6 @@ void AbstractNode::StartHeartbeatTimer(const std::shared_ptr<TcpClient> &client)
|
|||
}
|
||||
});
|
||||
MS_EXCEPTION_IF_NULL(heart_beat_thread_);
|
||||
heart_beat_thread_->detach();
|
||||
}
|
||||
|
||||
bool AbstractNode::Heartbeat(const std::shared_ptr<TcpClient> &client) {
|
||||
|
@ -1135,11 +1151,17 @@ bool AbstractNode::InitClientToScheduler() {
|
|||
}
|
||||
void AbstractNode::ConnectToScheduler() {
|
||||
client_to_scheduler_->Init();
|
||||
if (TcpClient::is_started()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (client_to_scheduler_thread_ != nullptr && client_to_scheduler_thread_->joinable()) {
|
||||
client_to_scheduler_thread_->join();
|
||||
}
|
||||
client_to_scheduler_thread_ = std::make_unique<std::thread>([this]() {
|
||||
MS_LOG(INFO) << "The node start a tcp client!";
|
||||
client_to_scheduler_->Start();
|
||||
});
|
||||
client_to_scheduler_thread_->detach();
|
||||
}
|
||||
|
||||
const std::shared_ptr<TcpClient> &AbstractNode::GetOrCreateTcpClient(const uint32_t &rank_id, const NodeRole &role) {
|
||||
|
@ -1463,7 +1485,6 @@ void AbstractNode::CreateTcpServer() {
|
|||
this->server_->Start();
|
||||
});
|
||||
MS_EXCEPTION_IF_NULL(server_thread_);
|
||||
server_thread_->detach();
|
||||
}
|
||||
|
||||
void AbstractNode::UpdateClusterState(const ClusterState &state) {
|
||||
|
|
|
@ -58,7 +58,7 @@ class BACKEND_EXPORT AbstractNode : public Node {
|
|||
persistent_state_(PersistentState::NOT_ENABLE_PERSIST),
|
||||
scheduler_ip_(""),
|
||||
scheduler_port_(0) {}
|
||||
~AbstractNode() override = default;
|
||||
~AbstractNode() override;
|
||||
|
||||
typedef void (AbstractNode::*ResponseHandler)(const std::shared_ptr<MessageMeta> &meta, const void *data,
|
||||
size_t size);
|
||||
|
|
|
@ -105,6 +105,10 @@ bool HttpRequestHandler::Stop() {
|
|||
MS_LOG(INFO) << "Stop http server!";
|
||||
|
||||
MS_EXCEPTION_IF_NULL(evbase_);
|
||||
if (event_base_got_break(evbase_)) {
|
||||
MS_LOG(INFO) << "The event base has already been stopped!";
|
||||
return true;
|
||||
}
|
||||
int ret = event_base_loopbreak(evbase_);
|
||||
if (ret != 0) {
|
||||
MS_LOG(ERROR) << "event base loop break failed!";
|
||||
|
|
|
@ -45,6 +45,11 @@ HttpServer::~HttpServer() {
|
|||
if (!Stop()) {
|
||||
MS_LOG(WARNING) << "Stop http server failed.";
|
||||
}
|
||||
for (size_t i = 0; i < worker_threads_.size(); i++) {
|
||||
if (worker_threads_[i] != nullptr && worker_threads_[i]->joinable()) {
|
||||
worker_threads_[i]->join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool HttpServer::InitServer() {
|
||||
|
@ -139,7 +144,7 @@ bool HttpServer::RegisterRoute(const std::string &url, OnRequestReceive *functio
|
|||
return true;
|
||||
}
|
||||
|
||||
bool HttpServer::Start(bool is_detach) {
|
||||
bool HttpServer::Start() {
|
||||
MS_LOG(INFO) << "Start http server!";
|
||||
for (size_t i = 0; i < thread_num_; i++) {
|
||||
auto http_request_handler = std::make_shared<HttpRequestHandler>();
|
||||
|
@ -151,9 +156,6 @@ bool HttpServer::Start(bool is_detach) {
|
|||
http_request_handlers.push_back(http_request_handler);
|
||||
auto thread = std::make_shared<std::thread>(&HttpRequestHandler::Run, http_request_handler);
|
||||
MS_EXCEPTION_IF_NULL(thread);
|
||||
if (is_detach) {
|
||||
thread->detach();
|
||||
}
|
||||
worker_threads_.emplace_back(thread);
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -65,7 +65,7 @@ class HttpServer {
|
|||
// Return: true if success, false if failed, check log to find failure reason
|
||||
bool RegisterRoute(const std::string &url, OnRequestReceive *func);
|
||||
|
||||
bool Start(bool is_detach = true);
|
||||
bool Start();
|
||||
bool Wait();
|
||||
bool Stop();
|
||||
|
||||
|
|
|
@ -166,6 +166,11 @@ void TcpClient::StartWithDelay(int seconds) {
|
|||
void TcpClient::Stop() {
|
||||
MS_EXCEPTION_IF_NULL(event_base_);
|
||||
std::lock_guard<std::mutex> lock(connection_mutex_);
|
||||
if (event_base_got_break(event_base_)) {
|
||||
MS_LOG(DEBUG) << "The event base has already been stopped!";
|
||||
return;
|
||||
}
|
||||
|
||||
MS_LOG(INFO) << "Stop tcp client!";
|
||||
int ret = event_base_loopbreak(event_base_);
|
||||
if (ret != 0) {
|
||||
|
|
|
@ -73,6 +73,7 @@ class TcpClient {
|
|||
void set_timer_callback(const OnTimer &timer);
|
||||
const event_base &eventbase() const;
|
||||
int connection_status() { return connection_status_; }
|
||||
static bool is_started() { return is_started_; }
|
||||
|
||||
protected:
|
||||
static void SetTcpNoDelay(const evutil_socket_t &fd);
|
||||
|
|
|
@ -214,7 +214,7 @@ void TcpServer::Stop() {
|
|||
std::lock_guard<std::mutex> lock(connection_mutex_);
|
||||
MS_LOG(INFO) << "Stop tcp server!";
|
||||
if (event_base_got_break(base_)) {
|
||||
MS_LOG(DEBUG) << "The event base has stopped!";
|
||||
MS_LOG(DEBUG) << "The event base has already been stopped!";
|
||||
is_stop_ = true;
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -1459,7 +1459,7 @@ void SchedulerNode::StartRestfulServer(const std::string &address, std::uint16_t
|
|||
MS_LOG(EXCEPTION) << "The scheduler init http server failed.";
|
||||
}
|
||||
|
||||
if (!http_server_->Start(false)) {
|
||||
if (!http_server_->Start()) {
|
||||
MS_LOG(EXCEPTION) << "The scheduler start http server failed.";
|
||||
}
|
||||
restful_thread_ = std::make_unique<std::thread>([&]() { http_server_->Wait(); });
|
||||
|
|
Loading…
Reference in New Issue