Fix coredump problem of fl

This commit is contained in:
zhangzhaoju 2022-03-14 11:57:21 +08:00
parent f188616162
commit b3493ef131
7 changed files with 35 additions and 17 deletions

View File

@ -411,6 +411,7 @@ void Iteration::HandleNotifyLeaderMoveToNextIterRequest(const std::shared_ptr<ps
return;
}
std::unique_lock<std::mutex> lock(iter_move_mtx_);
NotifyLeaderMoveToNextIterRequest notify_leader_to_next_iter_req;
(void)notify_leader_to_next_iter_req.ParseFromArray(message->data(), SizeToInt(message->len()));
const auto &rank = notify_leader_to_next_iter_req.rank();

View File

@ -288,6 +288,9 @@ class Iteration {
std::atomic<IterationResult> iteration_result_;
nlohmann::json new_instance_json_;
// mutex for iter move to next, avoid core dump
std::mutex iter_move_mtx_;
};
} // namespace server
} // namespace fl

View File

@ -42,8 +42,8 @@ TcpClient::TcpClient(const std::string &address, std::uint16_t port)
buffer_event_(nullptr),
server_address_(std::move(address)),
server_port_(port),
is_stop_(true),
is_connected_(false) {
disconnected_(false),
connected_(false) {
message_handler_.SetCallback(
[this](const std::shared_ptr<MessageMeta> &meta, const Protos &protos, const void *data, size_t size) {
if (message_callback_) {
@ -72,16 +72,16 @@ void TcpClient::set_connected_callback(const OnConnected &connected) { connected
bool TcpClient::WaitConnected(const uint32_t &connected_timeout) {
std::unique_lock<std::mutex> lock(connection_mutex_);
bool res = connection_cond_.wait_for(lock, std::chrono::seconds(connected_timeout),
[this] { return this->is_connected_.load(); });
[this] { return this->connected_.load(); });
return res;
}
void TcpClient::Init() {
std::lock_guard<std::mutex> lock(connection_mutex_);
if (buffer_event_) {
bufferevent_free(buffer_event_);
buffer_event_ = nullptr;
if (disconnected_) {
return;
}
std::lock_guard<std::mutex> lock(connection_mutex_);
if (!CommUtil::CheckIp(server_address_)) {
MS_LOG(EXCEPTION) << "The tcp client ip:" << server_address_ << " is illegal!";
}
@ -103,10 +103,10 @@ void TcpClient::Init() {
sin.sin_addr.s_addr = inet_addr(server_address_.c_str());
sin.sin_port = htons(server_port_);
if (!PSContext::instance()->enable_ssl()) {
if (!PSContext::instance()->enable_ssl() && buffer_event_ == nullptr) {
MS_LOG(INFO) << "SSL is disable.";
buffer_event_ = bufferevent_socket_new(event_base_, -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);
} else {
} else if (buffer_event_ == nullptr) {
if (!EstablishSSL()) {
MS_LOG(WARNING) << "Establish SSL failed.";
return;
@ -216,8 +216,8 @@ void TcpClient::TimerCallback(evutil_socket_t, int16_t, void *arg) {
}
void TcpClient::NotifyConnected() {
MS_LOG(INFO) << "Client connected to the server!";
is_connected_ = true;
MS_LOG(INFO) << "Client connected to the server! IP: " << server_address_ << ", port: " << server_port_;
connected_ = true;
connection_cond_.notify_all();
}

View File

@ -72,6 +72,7 @@ class TcpClient {
bool SendMessage(const std::shared_ptr<MessageMeta> &meta, const Protos &protos, const void *data, size_t size);
void set_timer_callback(const OnTimer &timer);
const event_base &eventbase() const;
void set_disconnected() { disconnected_ = true; }
protected:
static void SetTcpNoDelay(const evutil_socket_t &fd);
@ -107,8 +108,8 @@ class TcpClient {
std::string server_address_;
std::uint16_t server_port_;
std::atomic<bool> is_stop_;
std::atomic<bool> is_connected_;
std::atomic<bool> disconnected_;
std::atomic<bool> connected_;
// The Configuration file
Configuration *config_;
};

View File

@ -38,10 +38,10 @@ uint32_t NodeManager::checkIfRankIdExist(const RegisterMessage &register_message
registered_nodes_info_[node_id].is_alive = true;
registered_nodes_info_[node_id].ip_ = new_ip;
registered_nodes_info_[node_id].port_ = static_cast<uint16_t>(new_port);
MS_LOG(INFO) << "The node id: " << node_id << " is already assigned!"
<< ", ip: " << register_message.ip() << ", port: " << register_message.port()
<< ", rank id: " << rank_id << ", alive: " << registered_nodes_info_[node_id].is_alive
<< ", the node_role:" << CommUtil::NodeRoleToString(registered_nodes_info_[node_id].node_role_);
MS_LOG(WARNING) << "The node id: " << node_id << " is already assigned!"
<< ", ip: " << register_message.ip() << ", port: " << register_message.port()
<< ", rank id: " << rank_id << ", alive: " << registered_nodes_info_[node_id].is_alive
<< ", the node_role:" << CommUtil::NodeRoleToString(registered_nodes_info_[node_id].node_role_);
return rank_id;
}
// This is for scheduler recovery

View File

@ -94,6 +94,12 @@ bool ServerNode::Finish(const uint32_t &timeout) {
return true;
}
if (!is_connected_to_scheduler_) {
MS_LOG(INFO) << "[Server finish]: Not connect to scheduler, no need to disconnect!";
return true;
}
client_to_scheduler_->set_disconnected();
MS_LOG(INFO) << "[Server finish]: 1. Begin to finish server node!";
bool res = Disconnect(client_to_scheduler_, timeout);
if (res) {

View File

@ -95,6 +95,13 @@ bool WorkerNode::Finish(const uint32_t &timeout) {
MS_LOG(INFO) << "The node is already stop.";
return true;
}
if (!is_connected_to_scheduler_) {
MS_LOG(INFO) << "[Worker finish]: Not connect to scheduler, no need to disconnect!";
return true;
}
client_to_scheduler_->set_disconnected();
bool res = Disconnect(client_to_scheduler_, timeout);
if (res) {
MS_LOG(INFO) << "[Worker finish]: 2. Successfully finish worker node!";