forked from mindspore-Ecosystem/mindspore
!17803 fixed terminate called without an activate exception
From: @anancds Reviewed-by: @cristoval,@wilfchen,@limingqi107 Signed-off-by: @limingqi107
This commit is contained in:
commit
729dbd752f
|
@ -20,6 +20,11 @@
|
|||
namespace mindspore {
|
||||
namespace ps {
|
||||
namespace core {
|
||||
CommunicatorBase::~CommunicatorBase() {
|
||||
running_ = false;
|
||||
Join();
|
||||
}
|
||||
|
||||
bool CommunicatorBase::SendResponse(const void *rsp_data, size_t rsp_len, std::shared_ptr<MessageHandler> msg_handler) {
|
||||
// The rsp_len could be 0 because of ProtoBuffer's feature.
|
||||
if (rsp_data == nullptr || msg_handler == nullptr) {
|
||||
|
|
|
@ -42,9 +42,9 @@ class CommunicatorBase {
|
|||
using OnNodeEventCallback = std::function<void(const ClusterEvent &)>;
|
||||
using TcpMsgCallback = std::function<void(std::shared_ptr<core::TcpConnection> conn,
|
||||
std::shared_ptr<core::MessageMeta> meta, DataPtr data, size_t size)>;
|
||||
CommunicatorBase() = default;
|
||||
CommunicatorBase() : running_(false) {}
|
||||
|
||||
virtual ~CommunicatorBase() = default;
|
||||
virtual ~CommunicatorBase();
|
||||
|
||||
virtual bool Start() = 0;
|
||||
virtual bool Stop() = 0;
|
||||
|
@ -59,6 +59,7 @@ class CommunicatorBase {
|
|||
protected:
|
||||
std::unordered_map<std::string, MessageCallback> msg_callbacks_;
|
||||
std::thread running_thread_;
|
||||
bool running_;
|
||||
};
|
||||
} // namespace core
|
||||
} // namespace ps
|
||||
|
|
|
@ -31,10 +31,8 @@ bool HttpCommunicator::Start() {
|
|||
MS_LOG(INFO) << "Http communicator started.";
|
||||
|
||||
running_thread_ = std::thread([&]() {
|
||||
try {
|
||||
http_server_->Wait();
|
||||
} catch (const std::exception &e) {
|
||||
MsException::Instance().SetException();
|
||||
while (running_) {
|
||||
std::this_thread::yield();
|
||||
}
|
||||
});
|
||||
return true;
|
||||
|
@ -42,7 +40,9 @@ bool HttpCommunicator::Start() {
|
|||
|
||||
bool HttpCommunicator::Stop() {
|
||||
MS_EXCEPTION_IF_NULL(http_server_);
|
||||
return http_server_->Stop();
|
||||
bool res = http_server_->Stop();
|
||||
running_ = false;
|
||||
return res;
|
||||
}
|
||||
|
||||
void HttpCommunicator::RegisterMsgCallBack(const std::string &msg_type, const MessageCallback &cb) {
|
||||
|
|
|
@ -115,13 +115,17 @@ bool HttpServer::RegisterRoute(const std::string &url, OnRequestReceive *functio
|
|||
return true;
|
||||
}
|
||||
|
||||
bool HttpServer::Start() {
|
||||
bool HttpServer::Start(bool is_detach) {
|
||||
MS_LOG(INFO) << "Start http server!";
|
||||
for (size_t i = 0; i < thread_num_; i++) {
|
||||
auto http_request_handler = std::make_shared<HttpRequestHandler>();
|
||||
http_request_handler->Initialize(fd_, request_handlers_);
|
||||
http_request_handlers.push_back(http_request_handler);
|
||||
worker_threads_.emplace_back(std::make_shared<std::thread>(&HttpRequestHandler::Run, http_request_handler));
|
||||
auto thread = std::make_shared<std::thread>(&HttpRequestHandler::Run, http_request_handler);
|
||||
if (is_detach) {
|
||||
thread->detach();
|
||||
}
|
||||
worker_threads_.emplace_back(thread);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ class HttpServer {
|
|||
// Return: true if success, false if failed, check log to find failure reason
|
||||
bool RegisterRoute(const std::string &url, OnRequestReceive *func);
|
||||
|
||||
bool Start();
|
||||
bool Start(bool is_detach = true);
|
||||
bool Wait();
|
||||
bool Stop();
|
||||
|
||||
|
|
|
@ -65,7 +65,6 @@ class TcpCommunicator : public CommunicatorBase {
|
|||
public:
|
||||
explicit TcpCommunicator(const std::shared_ptr<TaskExecutor> &task_executor, ServerNode *node)
|
||||
: task_executor_(task_executor),
|
||||
running_(false),
|
||||
server_num_(0),
|
||||
worker_num_(0),
|
||||
scheduler_ip_(""),
|
||||
|
@ -109,7 +108,6 @@ class TcpCommunicator : public CommunicatorBase {
|
|||
|
||||
private:
|
||||
std::shared_ptr<TaskExecutor> task_executor_;
|
||||
bool running_;
|
||||
|
||||
TcpMsgCallback tcp_msg_callback_;
|
||||
OnNodeEventCallback event_callback_;
|
||||
|
|
|
@ -429,6 +429,7 @@ void SchedulerNode::ProcessScaleOut(std::shared_ptr<HttpMessageHandler> resp) {
|
|||
nlohmann::json js;
|
||||
js["message"] = "Cluster begin to scale out.";
|
||||
resp->AddRespString(js.dump());
|
||||
resp->AddRespHeadParam("Content_Type", "application/json");
|
||||
|
||||
resp->SetRespCode(HTTP_OK);
|
||||
resp->SendResponse();
|
||||
|
@ -507,6 +508,7 @@ void SchedulerNode::ProcessScaleIn(std::shared_ptr<HttpMessageHandler> resp) {
|
|||
nlohmann::json js;
|
||||
js["message"] = "Cluster begin to scale in.";
|
||||
resp->AddRespString(js.dump());
|
||||
resp->AddRespHeadParam("Content_Type", "application/json");
|
||||
|
||||
resp->SetRespCode(HTTP_OK);
|
||||
resp->SendResponse();
|
||||
|
@ -543,6 +545,7 @@ void SchedulerNode::ProcessGetNodesInfo(std::shared_ptr<HttpMessageHandler> resp
|
|||
}
|
||||
|
||||
resp->AddRespString(js.dump());
|
||||
resp->AddRespHeadParam("Content_Type", "application/json");
|
||||
|
||||
resp->SetRespCode(HTTP_OK);
|
||||
resp->SendResponse();
|
||||
|
@ -562,6 +565,7 @@ void SchedulerNode::ProcessGetClusterState(std::shared_ptr<HttpMessageHandler> r
|
|||
js["cluster_state"] = CommUtil::ClusterStateToString(cluster_state);
|
||||
|
||||
resp->AddRespString(js.dump());
|
||||
resp->AddRespHeadParam("Content_Type", "application/json");
|
||||
|
||||
resp->SetRespCode(HTTP_OK);
|
||||
resp->SendResponse();
|
||||
|
@ -601,6 +605,13 @@ RequestProcessResult SchedulerNode::CheckIfNodeIdLegal(const std::vector<std::st
|
|||
ERROR_STATUS(result, RequestProcessResultCode::kInvalidInputs, error_message);
|
||||
return result;
|
||||
}
|
||||
|
||||
if (node_infos[val].node_role_ == NodeRole::WORKER) {
|
||||
std::string error_message = "The node id:" + val + " is the role of worker, should not be scale in.";
|
||||
MS_LOG(ERROR) << error_message;
|
||||
ERROR_STATUS(result, RequestProcessResultCode::kInvalidInputs, error_message);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
|
@ -628,7 +639,7 @@ void SchedulerNode::StartRestfulServer(const std::string &address, std::uint16_t
|
|||
|
||||
http_server_->InitServer();
|
||||
|
||||
http_server_->Start();
|
||||
http_server_->Start(false);
|
||||
restful_thread_ = std::make_unique<std::thread>([&]() { http_server_->Wait(); });
|
||||
}
|
||||
|
||||
|
|
|
@ -77,6 +77,7 @@ void ServerNode::CreateTcpServer() {
|
|||
MS_LOG(INFO) << "The server node start a tcp server!";
|
||||
this->server_->Start();
|
||||
});
|
||||
server_thread_->detach();
|
||||
}
|
||||
|
||||
void ServerNode::Initialize() {
|
||||
|
@ -158,20 +159,13 @@ bool ServerNode::Stop() {
|
|||
if (!is_already_stopped_.load()) {
|
||||
is_already_stopped_ = true;
|
||||
is_finish_ = true;
|
||||
if (heart_beat_thread_->joinable()) {
|
||||
heart_beat_thread_->join();
|
||||
}
|
||||
client_to_scheduler_->Stop();
|
||||
if (!connected_nodes_.empty()) {
|
||||
for (auto &connected_node : connected_nodes_) {
|
||||
connected_node.second->Stop();
|
||||
}
|
||||
}
|
||||
if (client_to_scheduler_thread_->joinable()) {
|
||||
client_to_scheduler_thread_->join();
|
||||
}
|
||||
server_->Stop();
|
||||
server_thread_->join();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -68,6 +68,7 @@ void WorkerNode::CreateTcpServer() {
|
|||
MS_LOG(INFO) << "The worker node start a tcp server!";
|
||||
server_->Start();
|
||||
});
|
||||
server_thread_->detach();
|
||||
}
|
||||
|
||||
bool WorkerNode::Stop() {
|
||||
|
@ -82,7 +83,6 @@ bool WorkerNode::Stop() {
|
|||
}
|
||||
}
|
||||
server_->Stop();
|
||||
server_thread_->join();
|
||||
is_already_stopped_ = true;
|
||||
}
|
||||
return true;
|
||||
|
|
Loading…
Reference in New Issue