fixed Fatal python error

This commit is contained in:
chendongsheng 2021-04-07 10:01:57 +08:00
parent 3fe401feb5
commit 2a9f104ac0
9 changed files with 10 additions and 28 deletions

View File

@ -390,11 +390,9 @@ bool AbstractNode::Disconnect(const std::shared_ptr<TcpClient> &client, const ui
auto meta = std::make_shared<MessageMeta>();
meta->set_cmd(NodeCommand::FINISH);
FinishMessage finish_message;
finish_message.set_node_id(node_info_.node_id_);
std::string finish_message = node_info_.node_id_;
if (!SendMessageSync(client, meta, Protos::PROTOBUF, finish_message.SerializeAsString().data(),
finish_message.ByteSizeLong())) {
if (!SendMessageSync(client, meta, Protos::RAW, finish_message.data(), finish_message.length())) {
MS_LOG(WARNING) << "The node role:" << CommUtil::NodeRoleToString(node_info_.node_role_)
<< " the node id:" << node_info_.node_id_ << " send Finish Message timeout!";
}

View File

@ -142,9 +142,6 @@ bool CommUtil::Retry(const std::function<bool()> &func, size_t max_attempts, siz
void CommUtil::LogCallback(int severity, const char *msg) {
switch (severity) {
case EVENT_LOG_DEBUG:
MS_LOG(DEBUG) << kLibeventLogPrefix << msg;
break;
case EVENT_LOG_MSG:
MS_LOG(INFO) << kLibeventLogPrefix << msg;
break;
@ -155,7 +152,6 @@ void CommUtil::LogCallback(int severity, const char *msg) {
MS_LOG(ERROR) << kLibeventLogPrefix << msg;
break;
default:
MS_LOG(WARNING) << kLibeventLogPrefix << msg;
break;
}
}

View File

@ -110,7 +110,7 @@ void NodeManager::UpdateClusterState() {
}
}
if (!timeout_nodes_info_.empty()) {
is_cluster_timeout_ = true;
is_node_timeout_ = true;
for (auto it = timeout_nodes_info_.begin(); it != timeout_nodes_info_.end(); ++it) {
finish_nodes_id_.insert(it->first);
}
@ -138,9 +138,7 @@ void NodeManager::CheckClusterTimeout() {
}
}
void NodeManager::AddFinishNode(const FinishMessage &finish_message) {
finish_nodes_id_.insert(finish_message.node_id());
}
void NodeManager::AddFinishNode(const std::string &finish_message) { finish_nodes_id_.insert(finish_message); }
std::unordered_map<std::string, NodeInfo> NodeManager::nodes_info() { return nodes_info_; }

View File

@ -61,7 +61,7 @@ class NodeManager {
std::vector<ServersMeta> FetchServersMeta();
void UpdateClusterState();
void CheckClusterTimeout();
void AddFinishNode(const FinishMessage &finish_message);
void AddFinishNode(const std::string &finish_message);
std::unordered_map<std::string, NodeInfo> nodes_info();
bool is_cluster_ready();
bool is_cluster_finish();

View File

@ -139,10 +139,9 @@ void SchedulerNode::ProcessFinish(std::shared_ptr<TcpServer> server, std::shared
MS_EXCEPTION_IF_NULL(conn);
MS_EXCEPTION_IF_NULL(meta);
MS_EXCEPTION_IF_NULL(data);
FinishMessage finish_message;
finish_message.ParseFromArray(data, size);
node_manager_.AddFinishNode(finish_message);
MS_LOG(INFO) << "Process finish message from node id:" << finish_message.node_id();
auto finish_message = std::make_unique<std::string>(reinterpret_cast<const char *>(data), size);
node_manager_.AddFinishNode(*finish_message);
MS_LOG(INFO) << "Process finish message from node id:" << *finish_message;
server->SendMessage(conn, meta, Protos::PROTOBUF, data, size);
}

View File

@ -18,11 +18,6 @@
namespace mindspore {
namespace ps {
namespace core {
ServerNode::~ServerNode() {
MS_LOG(INFO) << "Stop server node!";
Stop();
}
bool ServerNode::Start(const uint32_t &timeout) {
MS_LOG(INFO) << "Start server node!";
Initialize();

View File

@ -36,7 +36,7 @@ namespace core {
class ServerNode : public AbstractNode {
public:
ServerNode() : server_(nullptr), server_thread_(nullptr) {}
~ServerNode() override;
~ServerNode() override = default;
bool Start(const uint32_t &timeout = ClusterMetadata::instance()->cluster_available_timeout()) override;
bool Stop() override;

View File

@ -20,10 +20,6 @@
namespace mindspore {
namespace ps {
namespace core {
WorkerNode::~WorkerNode() {
MS_LOG(INFO) << "Stop worker node!";
Stop();
}
bool WorkerNode::Start(const uint32_t &timeout) {
MS_LOG(INFO) << "Starting worker node!";
Initialize();

View File

@ -35,7 +35,7 @@ namespace core {
class WorkerNode : public AbstractNode {
public:
WorkerNode() = default;
~WorkerNode() override;
~WorkerNode() override = default;
bool Start(const uint32_t &timeout = ClusterMetadata::instance()->cluster_available_timeout()) override;
bool Stop() override;