added log callback

This commit is contained in:
chendongsheng 2021-02-22 16:10:50 +08:00
parent cab5ece4e9
commit 425e5cae49
25 changed files with 292 additions and 217 deletions

View File

@ -12,7 +12,7 @@ if(NOT (ENABLE_CPU AND (ENABLE_D OR ENABLE_GPU)))
list(REMOVE_ITEM _PS_SRC_FILES "core/tcp_client.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/tcp_message_handler.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/tcp_server.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/cluster_config.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/cluster_metadata.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/node.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/node_manager.cc")
list(REMOVE_ITEM _PS_SRC_FILES "ps_cache/ps_cache_manager.cc")

View File

@ -107,8 +107,8 @@ bool AbstractNode::Send(const NodeRole &node_role, const std::vector<uint32_t> &
const uint32_t &timeout) {
uint64_t request_id = AddMessageTrack(data.size());
if (rank_ids.size() != data.size()) {
MS_LOG(EXCEPTION) << "The number of rank ids is not equal to the number of data!";
if (rank_ids.size() != data.size() || rank_ids.size() != lens.size()) {
MS_LOG(EXCEPTION) << "The number of rank ids, data and lens are not equal!";
}
for (size_t it = 0; it < rank_ids.size(); ++it) {
if (!CommUtil::ValidateRankId(node_role, rank_ids.at(it))) {
@ -235,10 +235,8 @@ uint64_t AbstractNode::CollectiveSendAsync(const enum NodeRole &node_role, const
}
std::pair<uint32_t, uint64_t> AbstractNode::CollectiveReceiveAsync(const enum NodeRole &node_role,
const uint32_t &rank_id, void **output,
size_t *size) {
const uint32_t &rank_id, VectorPtr *output) {
MS_EXCEPTION_IF_NULL(output);
MS_EXCEPTION_IF_NULL(size);
if (!CommUtil::ValidateRankId(node_role, rank_id)) {
MS_LOG(EXCEPTION) << "The node role or rank_id is illegal!";
}
@ -248,8 +246,7 @@ std::pair<uint32_t, uint64_t> AbstractNode::CollectiveReceiveAsync(const enum No
receive_messages_done_[std::make_pair(rank_id, rank_request_id)] = false;
if (received_data_.count(std::make_pair(rank_id, rank_request_id)) > 0) {
auto res = received_data_[std::make_pair(rank_id, rank_request_id)];
*output = res->data();
*size = res->size();
*output = res;
received_data_.erase(std::make_pair(rank_id, rank_request_id));
receive_messages_done_[std::make_pair(rank_id, rank_request_id)] = true;
MS_LOG(DEBUG) << "Receive data from rank id:" << rank_id << ", the rank request id is:" << rank_request_id;
@ -257,8 +254,7 @@ std::pair<uint32_t, uint64_t> AbstractNode::CollectiveReceiveAsync(const enum No
receive_callbacks_[std::make_pair(rank_id, rank_request_id)] = [=]() mutable {
receive_callbacks_mutex_.lock();
auto res = received_data_[std::make_pair(rank_id, rank_request_id)];
*output = res->data();
*size = res->size();
*output = res;
received_data_.erase(std::make_pair(rank_id, rank_request_id));
receive_messages_done_[std::make_pair(rank_id, rank_request_id)] = true;
MS_LOG(DEBUG) << "Receive data from rank id:" << rank_id << ", the rank request id is:" << rank_request_id;
@ -295,7 +291,7 @@ void AbstractNode::StartHeartbeatTimer(const std::shared_ptr<TcpClient> &client)
} else {
UpdateSchedulerTime();
}
std::this_thread::sleep_for(std::chrono::seconds(ClusterConfig::heartbeat_interval()));
std::this_thread::sleep_for(std::chrono::seconds(ClusterMetadata::instance()->heartbeat_interval()));
}
});
}
@ -327,7 +323,7 @@ void AbstractNode::UpdateSchedulerTime() {
bool AbstractNode::CheckSchedulerTimeout() const {
struct timeval current_time {};
(void)gettimeofday(&current_time, nullptr);
if (scheduler_time_.tv_sec + ClusterConfig::scheduler_timeout() < current_time.tv_sec) {
if (scheduler_time_.tv_sec + ClusterMetadata::instance()->scheduler_timeout() < current_time.tv_sec) {
return true;
}
return false;
@ -414,8 +410,8 @@ bool AbstractNode::WaitForDisconnect(const uint32_t &timeout) {
}
bool AbstractNode::InitClientToScheduler() {
std::string scheduler_host = ClusterConfig::scheduler_host();
uint16_t scheduler_port = ClusterConfig::scheduler_port();
std::string scheduler_host = ClusterMetadata::instance()->scheduler_host();
uint16_t scheduler_port = ClusterMetadata::instance()->scheduler_port();
client_to_scheduler_ = std::make_shared<TcpClient>(scheduler_host, scheduler_port);
client_to_scheduler_->SetMessageCallback(
[&](std::shared_ptr<MessageMeta> meta, const Protos &protos, const void *data, size_t size) {
@ -436,7 +432,7 @@ bool AbstractNode::InitClientToScheduler() {
});
client_to_scheduler_->set_disconnected_callback([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(ClusterConfig::connect_interval()));
std::this_thread::sleep_for(std::chrono::milliseconds(ClusterMetadata::instance()->connect_interval()));
client_to_scheduler_->Init();
});
return client_to_scheduler_->WaitConnected();
@ -507,8 +503,7 @@ bool AbstractNode::SendMessageSync(const std::shared_ptr<TcpClient> &client, std
client->SendMessage(meta, protos, data, size);
MS_LOG(DEBUG) << "The node role is:" << CommUtil::NodeRoleToString(node_info_.node_role_)
<< ", the node id is:" << node_info_.node_id_ << " send the request id is:" << request_id;
bool res = Wait(request_id, timeout);
return res;
return Wait(request_id, timeout);
}
void AbstractNode::ProcessSendDataResp(std::shared_ptr<MessageMeta> meta, const Protos &protos, const void *data,
@ -589,7 +584,7 @@ void AbstractNode::RunReceiveCallback(std::shared_ptr<MessageMeta> meta, const P
}
received_data_[std::make_pair(rank_id, rank_request_id)] = received_data;
MS_LOG(DEBUG) << "Run Receive data callback,the rank id:" << rank_id << ", the rank request id is:" << rank_request_id
<< ", the send request id is:" << meta->request_id();
<< ", the send request id is:" << meta->request_id() << " the size is:" << size;
auto it = receive_callbacks_.find(std::make_pair(rank_id, rank_request_id));
if (it != receive_callbacks_.end()) {
receive_callbacks_mutex_.unlock();

View File

@ -57,7 +57,7 @@ class AbstractNode : public Node {
uint64_t CollectiveSendAsync(const enum NodeRole &node_role, const uint32_t &rank_id, const void *data, size_t size);
std::pair<uint32_t, uint64_t> CollectiveReceiveAsync(const enum NodeRole &node_role, const uint32_t &rank_id,
void **output, size_t *size);
VectorPtr *output);
bool CollectiveWait(std::pair<uint32_t, uint64_t> request_id, const uint32_t &timeout = kCommTimeoutInSeconds);
protected:

View File

@ -1,85 +0,0 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ps/core/cluster_config.h"
#include <string>
namespace mindspore {
namespace ps {
namespace core {
uint32_t ClusterConfig::worker_num_ = 0;
uint32_t ClusterConfig::server_num_ = 0;
std::unique_ptr<std::string> ClusterConfig::scheduler_host_ = nullptr;
uint16_t ClusterConfig::scheduler_port_ = 0;
// The interval for sending heartbeat packets between worker node,server node and scheduler node is 3 seconds.
uint32_t ClusterConfig::heartbeat_interval_ = 3;
// The timeout for worker node and server node sending heartbeat packets to scheduler node is 30 seconds.
uint32_t ClusterConfig::heartbeat_timeout_ = 30;
// Timeout period for cluster preparation is 300 seconds.
uint32_t ClusterConfig::cluster_available_timeout_ = 300;
// The timeout period for the client to connect to the server is 100ms.
uint32_t ClusterConfig::connect_interval_ = 100;
// When the scheduler exits, the worker and server can continue to work for 5 hours
uint32_t ClusterConfig::scheduler_timeout_ = 3600 * 5;
void ClusterConfig::Init(const uint32_t &worker_num, const uint32_t &server_num, std::string scheduler_host,
const uint16_t &scheduler_port) {
worker_num_ = worker_num;
server_num_ = server_num;
if (!CommUtil::CheckIp(scheduler_host)) {
MS_LOG(EXCEPTION) << "The scheduler_host:" << scheduler_host << " is illegal!";
}
scheduler_host_ = std::make_unique<std::string>(scheduler_host);
scheduler_port_ = scheduler_port;
}
uint32_t ClusterConfig::worker_num() { return worker_num_; }
uint32_t ClusterConfig::server_num() { return server_num_; }
uint32_t ClusterConfig::heartbeat_interval() { return heartbeat_interval_; }
void ClusterConfig::set_heartbeat_interval(const uint32_t &heartbeat_interval) {
heartbeat_interval_ = heartbeat_interval;
}
std::string ClusterConfig::scheduler_host() { return *scheduler_host_; }
uint16_t ClusterConfig::scheduler_port() { return scheduler_port_; }
uint32_t ClusterConfig::heartbeat_timeout() { return heartbeat_timeout_; }
void ClusterConfig::set_heartbeat_timeout(const uint32_t &heartbeat_timeout) {
heartbeat_interval_ = heartbeat_timeout;
}
uint32_t ClusterConfig::cluster_available_timeout() { return cluster_available_timeout_; }
void ClusterConfig::set_cluster_available_timeout(const uint32_t &cluster_available_timeout) {
cluster_available_timeout_ = cluster_available_timeout;
}
uint32_t ClusterConfig::connect_interval() { return connect_interval_; }
void ClusterConfig::set_connect_interval(const uint32_t &connect_interval) { connect_interval_ = connect_interval; }
uint32_t ClusterConfig::scheduler_timeout() { return scheduler_timeout_; }
void ClusterConfig::set_scheduler_timeout(const uint32_t &scheduler_timeout) { scheduler_timeout_ = scheduler_timeout; }
} // namespace core
} // namespace ps
} // namespace mindspore

View File

@ -1,65 +0,0 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_PS_CORE_CLUSTER_CONFIG_H_
#define MINDSPORE_CCSRC_PS_CORE_CLUSTER_CONFIG_H_
#include <string>
#include <iostream>
#include <memory>
#include <utility>
#include "utils/log_adapter.h"
#include "ps/core/comm_util.h"
namespace mindspore {
namespace ps {
namespace core {
class ClusterConfig {
public:
static void Init(const uint32_t &worker_num, const uint32_t &server_num, std::string scheduler_host,
const uint16_t &scheduler_port);
static uint32_t worker_num();
static uint32_t server_num();
static uint32_t heartbeat_interval();
static void set_heartbeat_interval(const uint32_t &heartbeat_interval);
static std::string scheduler_host();
static uint16_t scheduler_port();
static uint32_t heartbeat_timeout();
static void set_heartbeat_timeout(const uint32_t &heartbeat_timeout);
static uint32_t cluster_available_timeout();
static void set_cluster_available_timeout(const uint32_t &cluster_available_timeout);
static uint32_t connect_interval();
static void set_connect_interval(const uint32_t &connect_interval);
static uint32_t scheduler_timeout();
static void set_scheduler_timeout(const uint32_t &scheduler_timeout);
private:
static uint32_t worker_num_;
static uint32_t server_num_;
static uint32_t heartbeat_interval_;
static std::unique_ptr<std::string> scheduler_host_;
static uint16_t scheduler_port_;
static uint32_t heartbeat_timeout_;
static uint32_t cluster_available_timeout_;
static uint32_t connect_interval_;
static uint32_t scheduler_timeout_;
};
} // namespace core
} // namespace ps
} // namespace mindspore
#endif // MINDSPORE_CCSRC_PS_CORE_CLUSTER_CONFIG_H_

View File

@ -0,0 +1,82 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ps/core/cluster_metadata.h"
#include <string>
namespace mindspore {
namespace ps {
namespace core {
std::shared_ptr<ClusterMetadata> ClusterMetadata::instance() {
static std::shared_ptr<ClusterMetadata> metadata_instance = nullptr;
if (metadata_instance == nullptr) {
metadata_instance.reset(new (std::nothrow) ClusterMetadata());
}
return metadata_instance;
}
void ClusterMetadata::Init(const uint32_t &worker_num, const uint32_t &server_num, std::string scheduler_host,
const uint16_t &scheduler_port) {
worker_num_ = worker_num;
server_num_ = server_num;
if (!CommUtil::CheckIp(scheduler_host)) {
MS_LOG(EXCEPTION) << "The scheduler_host:" << scheduler_host << " is illegal!";
}
scheduler_host_ = std::make_unique<std::string>(scheduler_host);
scheduler_port_ = scheduler_port;
}
uint32_t ClusterMetadata::worker_num() { return worker_num_; }
uint32_t ClusterMetadata::server_num() { return server_num_; }
uint32_t ClusterMetadata::heartbeat_interval() { return heartbeat_interval_; }
void ClusterMetadata::set_heartbeat_interval(const uint32_t &heartbeat_interval) {
heartbeat_interval_ = heartbeat_interval;
}
std::string ClusterMetadata::scheduler_host() {
MS_EXCEPTION_IF_NULL(scheduler_host_);
return *scheduler_host_;
}
uint16_t ClusterMetadata::scheduler_port() { return scheduler_port_; }
uint32_t ClusterMetadata::heartbeat_timeout() { return heartbeat_timeout_; }
void ClusterMetadata::set_heartbeat_timeout(const uint32_t &heartbeat_timeout) {
heartbeat_interval_ = heartbeat_timeout;
}
uint32_t ClusterMetadata::cluster_available_timeout() { return cluster_available_timeout_; }
void ClusterMetadata::set_cluster_available_timeout(const uint32_t &cluster_available_timeout) {
cluster_available_timeout_ = cluster_available_timeout;
}
uint32_t ClusterMetadata::connect_interval() { return connect_interval_; }
void ClusterMetadata::set_connect_interval(const uint32_t &connect_interval) { connect_interval_ = connect_interval; }
uint32_t ClusterMetadata::scheduler_timeout() { return scheduler_timeout_; }
void ClusterMetadata::set_scheduler_timeout(const uint32_t &scheduler_timeout) {
scheduler_timeout_ = scheduler_timeout;
}
} // namespace core
} // namespace ps
} // namespace mindspore

View File

@ -0,0 +1,84 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_PS_CORE_CLUSTER_METADATA_H_
#define MINDSPORE_CCSRC_PS_CORE_CLUSTER_METADATA_H_
#include <string>
#include <iostream>
#include <memory>
#include <utility>
#include "utils/log_adapter.h"
#include "ps/core/comm_util.h"
namespace mindspore {
namespace ps {
namespace core {
class ClusterMetadata {
public:
~ClusterMetadata() = default;
ClusterMetadata(ClusterMetadata const &) = delete;
ClusterMetadata &operator=(const ClusterMetadata &) = delete;
static std::shared_ptr<ClusterMetadata> instance();
void Init(const uint32_t &worker_num, const uint32_t &server_num, std::string scheduler_host,
const uint16_t &scheduler_port);
uint32_t worker_num();
uint32_t server_num();
uint32_t heartbeat_interval();
void set_heartbeat_interval(const uint32_t &heartbeat_interval);
std::string scheduler_host();
uint16_t scheduler_port();
uint32_t heartbeat_timeout();
void set_heartbeat_timeout(const uint32_t &heartbeat_timeout);
uint32_t cluster_available_timeout();
void set_cluster_available_timeout(const uint32_t &cluster_available_timeout);
uint32_t connect_interval();
void set_connect_interval(const uint32_t &connect_interval);
uint32_t scheduler_timeout();
void set_scheduler_timeout(const uint32_t &scheduler_timeout);
private:
ClusterMetadata()
: worker_num_(0),
server_num_(0),
heartbeat_interval_(3),
scheduler_host_(nullptr),
scheduler_port_(0),
heartbeat_timeout_(30),
cluster_available_timeout_(300),
connect_interval_(100),
scheduler_timeout_(3600 * 5) {}
uint32_t worker_num_;
uint32_t server_num_;
// The interval for sending heartbeat packets between worker node,server node and scheduler node is 3 seconds.
uint32_t heartbeat_interval_;
std::unique_ptr<std::string> scheduler_host_;
uint16_t scheduler_port_;
// The timeout for worker node and server node sending heartbeat packets to scheduler node is 30 seconds.
uint32_t heartbeat_timeout_;
// Timeout period for cluster preparation is 300 seconds.
uint32_t cluster_available_timeout_;
// The timeout period for the client to connect to the server is 100ms.
uint32_t connect_interval_;
// When the scheduler exits, the worker and server can continue to work for 5 hours
uint32_t scheduler_timeout_;
};
} // namespace core
} // namespace ps
} // namespace mindspore
#endif // MINDSPORE_CCSRC_PS_CORE_CLUSTER_METADATA_H_

View File

@ -122,9 +122,9 @@ std::string CommUtil::NodeRoleToString(const NodeRole &role) {
}
}
bool CommUtil::ValidateRankId(const enum NodeRole &node_role, const uint32_t &rank_id) {
if (node_role == NodeRole::SERVER && (rank_id > ClusterConfig::server_num() - 1)) {
if (node_role == NodeRole::SERVER && (rank_id > ClusterMetadata::instance()->server_num() - 1)) {
return false;
} else if (node_role == NodeRole::WORKER && (rank_id > ClusterConfig::worker_num() - 1)) {
} else if (node_role == NodeRole::WORKER && (rank_id > ClusterMetadata::instance()->worker_num() - 1)) {
return false;
}
return true;
@ -139,6 +139,26 @@ bool CommUtil::Retry(const std::function<bool()> &func, size_t max_attempts, siz
}
return false;
}
void CommUtil::LogCallback(int severity, const char *msg) {
switch (severity) {
case EVENT_LOG_DEBUG:
MS_LOG(DEBUG) << kLibeventLogPrefix << msg;
break;
case EVENT_LOG_MSG:
MS_LOG(INFO) << kLibeventLogPrefix << msg;
break;
case EVENT_LOG_WARN:
MS_LOG(WARNING) << kLibeventLogPrefix << msg;
break;
case EVENT_LOG_ERR:
MS_LOG(ERROR) << kLibeventLogPrefix << msg;
break;
default:
MS_LOG(WARNING) << kLibeventLogPrefix << msg;
break;
}
}
} // namespace core
} // namespace ps
} // namespace mindspore

View File

@ -49,7 +49,7 @@
#include "proto/comm.pb.h"
#include "proto/ps.pb.h"
#include "ps/core/cluster_config.h"
#include "ps/core/cluster_metadata.h"
#include "utils/log_adapter.h"
namespace mindspore {
@ -65,6 +65,7 @@ constexpr int kGroup5RandomLength = 12;
constexpr int kMessageChunkLength = 4096;
// The timeout period for the http client to connect to the http server is 120 seconds.
constexpr int kConnectionTimeout = 120;
constexpr char kLibeventLogPrefix[] = "[libevent log]:";
class CommUtil {
public:
@ -75,6 +76,7 @@ class CommUtil {
static std::string NodeRoleToString(const NodeRole &role);
static bool ValidateRankId(const enum NodeRole &node_role, const uint32_t &rank_id);
static bool Retry(const std::function<bool()> &func, size_t max_attempts, size_t interval_milliseconds);
static void LogCallback(int severity, const char *msg);
private:
static std::random_device rd;

View File

@ -30,11 +30,10 @@
#include <utility>
#include <tuple>
#include "ps/core/cluster_config.h"
#include "ps/core/cluster_metadata.h"
#include "ps/core/node_info.h"
#include "ps/core/tcp_client.h"
#include "ps/core/tcp_server.h"
#include "utils/log_adapter.h"
namespace mindspore {
namespace ps {
@ -55,7 +54,7 @@ class Node {
using OnNodeEventMessage = std::function<void(const NodeEvent &event)>;
using MessageCallback = std::function<void()>;
virtual bool Start(const uint32_t &timeout = ClusterConfig::cluster_available_timeout()) = 0;
virtual bool Start(const uint32_t &timeout = ClusterMetadata::instance()->cluster_available_timeout()) = 0;
virtual bool Stop() = 0;
virtual bool Finish(const uint32_t &timeout = kTimeoutInSeconds) = 0;

View File

@ -19,7 +19,9 @@
namespace mindspore {
namespace ps {
namespace core {
void NodeManager::InitNodeNum() { total_node_num_ = ClusterConfig::server_num() + ClusterConfig::worker_num(); }
void NodeManager::InitNodeNum() {
total_node_num_ = ClusterMetadata::instance()->server_num() + ClusterMetadata::instance()->worker_num();
}
int NodeManager::NextRankId(const RegisterMessage &register_message) {
std::lock_guard<std::mutex> lock(assign_rank_id_mutex_);
@ -92,7 +94,7 @@ void NodeManager::UpdateClusterState() {
(void)gettimeofday(&current_time, nullptr);
timeout_nodes_info_.clear();
for (auto it = heartbeats_.begin(); it != heartbeats_.end(); ++it) {
if (it->second.tv_sec + ClusterConfig::heartbeat_timeout() < current_time.tv_sec) {
if (it->second.tv_sec + ClusterMetadata::instance()->heartbeat_timeout() < current_time.tv_sec) {
MS_LOG(ERROR) << "The node id:" << it->first << " is timeout!";
timeout_nodes_info_[it->first] = nodes_info_[it->first];
}
@ -118,7 +120,7 @@ void NodeManager::UpdateClusterState() {
void NodeManager::CheckClusterTimeout() {
if (total_node_num_ != nodes_info_.size()) {
MS_LOG(WARNING) << "The cluster is not ready after " << ClusterConfig::cluster_available_timeout()
MS_LOG(WARNING) << "The cluster is not ready after " << ClusterMetadata::instance()->cluster_available_timeout()
<< " seconds,so finish the cluster, and change total node number from " << total_node_num_ << " to "
<< nodes_info_.size();
current_node_num_ = nodes_info_.size();

View File

@ -88,8 +88,8 @@ void SchedulerNode::InitCommandHandler() {
void SchedulerNode::CreateTcpServer() {
node_manager_.InitNodeNum();
std::string scheduler_host = ClusterConfig::scheduler_host();
uint32_t scheduler_port = ClusterConfig::scheduler_port();
std::string scheduler_host = ClusterMetadata::instance()->scheduler_host();
uint32_t scheduler_port = ClusterMetadata::instance()->scheduler_port();
server_ = std::make_shared<TcpServer>(scheduler_host, scheduler_port);
server_->SetMessageCallback([&](std::shared_ptr<TcpConnection> conn, std::shared_ptr<MessageMeta> meta,
const Protos &protos, const void *data, size_t size) {
@ -149,6 +149,10 @@ void SchedulerNode::ProcessFinish(std::shared_ptr<TcpServer> server, std::shared
void SchedulerNode::ProcessFetchServers(std::shared_ptr<TcpServer> server, std::shared_ptr<TcpConnection> conn,
std::shared_ptr<MessageMeta> meta, const void *data, size_t size) {
MS_EXCEPTION_IF_NULL(server);
MS_EXCEPTION_IF_NULL(conn);
MS_EXCEPTION_IF_NULL(meta);
MS_EXCEPTION_IF_NULL(data);
FetchServersRespMessage fetch_servers_message;
std::vector<ServersMeta> servers_meta_list = node_manager_.FetchServersMeta();
@ -164,20 +168,21 @@ void SchedulerNode::StartUpdateClusterStateTimer() {
auto start_time = std::chrono::steady_clock::now();
while (!is_finish_.load()) {
// 1. update cluster timeout
if (!node_manager_.is_cluster_ready() && (std::chrono::steady_clock::now() - start_time >
std::chrono::seconds(ClusterConfig::cluster_available_timeout()))) {
if (!node_manager_.is_cluster_ready() &&
(std::chrono::steady_clock::now() - start_time >
std::chrono::seconds(ClusterMetadata::instance()->cluster_available_timeout()))) {
node_manager_.CheckClusterTimeout();
}
// 2. update cluster state
std::this_thread::sleep_for(std::chrono::seconds(ClusterConfig::heartbeat_interval()));
std::this_thread::sleep_for(std::chrono::seconds(ClusterMetadata::instance()->heartbeat_interval()));
node_manager_.UpdateClusterState();
if (node_manager_.is_cluster_ready()) {
is_ready_ = true;
wait_start_cond_.notify_all();
}
if (node_manager_.is_cluster_finish()) {
std::this_thread::sleep_for(std::chrono::seconds(ClusterConfig::heartbeat_interval() * 2));
std::this_thread::sleep_for(std::chrono::seconds(ClusterMetadata::instance()->heartbeat_interval() * 2));
is_finish_ = true;
wait_finish_cond_.notify_all();
}

View File

@ -27,7 +27,7 @@
#include <mutex>
#include <unordered_map>
#include "ps/core/cluster_config.h"
#include "ps/core/cluster_metadata.h"
#include "ps/core/tcp_client.h"
#include "ps/core/tcp_server.h"
#include "ps/core/node_manager.h"
@ -45,7 +45,7 @@ class SchedulerNode : public Node {
typedef void (SchedulerNode::*ResponseHandler)(std::shared_ptr<TcpServer> server, std::shared_ptr<TcpConnection> conn,
std::shared_ptr<MessageMeta> meta, const void *data, size_t size);
bool Start(const uint32_t &timeout = ClusterConfig::cluster_available_timeout()) override;
bool Start(const uint32_t &timeout = ClusterMetadata::instance()->cluster_available_timeout()) override;
bool Stop() override;
bool Finish(const uint32_t &timeout = kTimeoutInSeconds) override;

View File

@ -110,6 +110,12 @@ void ServerNode::ProcessSendData(std::shared_ptr<TcpConnection> conn, std::share
if (ret != 0) {
MS_LOG(EXCEPTION) << "The memcpy_s error, errorno(" << ret << ")";
}
MS_LOG(DEBUG) << "The node role is:" << CommUtil::NodeRoleToString(node_info_.node_role_)
<< ", the node id is:" << node_info_.node_id_ << " send the request id is:" << meta->request_id()
<< " the current time is:"
<< std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now())
.time_since_epoch()
.count();
request_handler_(conn, meta, res, size);
}

View File

@ -25,7 +25,7 @@
#include <utility>
#include <vector>
#include "ps/core/cluster_config.h"
#include "ps/core/cluster_metadata.h"
#include "ps/core/tcp_client.h"
#include "ps/core/tcp_server.h"
#include "ps/core/abstract_node.h"
@ -38,7 +38,7 @@ class ServerNode : public AbstractNode {
ServerNode() : server_(nullptr), server_thread_(nullptr) {}
~ServerNode() override;
bool Start(const uint32_t &timeout = ClusterConfig::cluster_available_timeout()) override;
bool Start(const uint32_t &timeout = ClusterMetadata::instance()->cluster_available_timeout()) override;
bool Stop() override;
bool Finish(const uint32_t &timeout = kTimeoutInSeconds) override;

View File

@ -88,6 +88,8 @@ void TcpClient::Init() {
MS_LOG(EXCEPTION) << "The tcp client ip:" << server_address_ << " is illegal!";
}
event_enable_debug_logging(EVENT_DBG_ALL);
event_set_log_callback(CommUtil::LogCallback);
int result = evthread_use_pthreads();
if (result != 0) {
MS_LOG(EXCEPTION) << "Use event pthread failed!";
@ -173,16 +175,11 @@ void TcpClient::ReadCallback(struct bufferevent *bev, void *ctx) {
MS_EXCEPTION_IF_NULL(bev);
MS_EXCEPTION_IF_NULL(ctx);
auto tcp_client = reinterpret_cast<TcpClient *>(ctx);
struct evbuffer *input = bufferevent_get_input(const_cast<struct bufferevent *>(bev));
MS_EXCEPTION_IF_NULL(input);
char read_buffer[kMessageChunkLength];
int read = 0;
while (EVBUFFER_LENGTH(input) > 0) {
int read = evbuffer_remove(input, &read_buffer, sizeof(read_buffer));
if (read == -1) {
MS_LOG(EXCEPTION) << "Can not drain data from the event buffer!";
}
while ((read = bufferevent_read(bev, &read_buffer, sizeof(read_buffer))) > 0) {
tcp_client->OnReadHandler(read_buffer, read);
}
}
@ -312,6 +309,10 @@ bool TcpClient::SendMessage(std::shared_ptr<MessageMeta> meta, const Protos &pro
MS_LOG(ERROR) << "Event buffer add protobuf data failed!";
res = false;
}
int result = bufferevent_flush(buffer_event_, EV_READ | EV_WRITE, BEV_FLUSH);
if (result < 0) {
MS_LOG(EXCEPTION) << "Bufferevent flush failed!";
}
bufferevent_unlock(buffer_event_);
return res;
}

View File

@ -32,7 +32,7 @@
#include <atomic>
#include <condition_variable>
#include "ps/core/cluster_config.h"
#include "ps/core/cluster_metadata.h"
#include "utils/convert_utils_base.h"
namespace mindspore {
@ -53,7 +53,7 @@ class TcpClient {
std::string GetServerAddress() const;
void set_disconnected_callback(const OnDisconnected &disconnected);
void set_connected_callback(const OnConnected &connected);
bool WaitConnected(const uint32_t &connected_timeout = ClusterConfig::cluster_available_timeout());
bool WaitConnected(const uint32_t &connected_timeout = ClusterMetadata::instance()->cluster_available_timeout());
void Init();
void StartWithDelay(int seconds);
void Stop();

View File

@ -23,6 +23,8 @@
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <csignal>
#include <utility>
@ -90,7 +92,15 @@ bool TcpConnection::SendMessage(std::shared_ptr<MessageMeta> meta, const Protos
MS_LOG(ERROR) << "Event buffer add protobuf data failed!";
res = false;
}
int result = bufferevent_flush(buffer_event_, EV_READ | EV_WRITE, BEV_FLUSH);
if (result < 0) {
MS_LOG(EXCEPTION) << "Bufferevent flush failed!";
}
bufferevent_unlock(buffer_event_);
MS_LOG(DEBUG) << "SendMessage the request id is:" << meta->request_id() << " the current time is:"
<< std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now())
.time_since_epoch()
.count();
return res;
}
@ -136,6 +146,8 @@ void TcpServer::Init() {
MS_LOG(EXCEPTION) << "Use event pthread failed!";
}
event_enable_debug_logging(EVENT_DBG_ALL);
event_set_log_callback(CommUtil::LogCallback);
is_stop_ = false;
base_ = event_base_new();
MS_EXCEPTION_IF_NULL(base_);
@ -284,7 +296,7 @@ void TcpServer::ListenerCallback(struct evconnlistener *, evutil_socket_t fd, st
std::shared_ptr<TcpConnection> conn = server->onCreateConnection(bev, fd);
MS_EXCEPTION_IF_NULL(conn);
SetTcpNoDelay(fd);
server->AddConnection(fd, conn);
conn->InitConnection([=](std::shared_ptr<MessageMeta> meta, const Protos &protos, const void *data, size_t size) {
OnServerReceiveMessage on_server_receive = server->GetServerReceive();
@ -337,6 +349,11 @@ void TcpServer::ReadCallback(struct bufferevent *bev, void *connection) {
MS_LOG(EXCEPTION) << "Can not drain data from the event buffer!";
}
conn->OnReadHandler(read_buffer, IntToSize(read));
MS_LOG(DEBUG) << "the current time is:"
<< std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now())
.time_since_epoch()
.count()
<< " the read size is:" << read;
}
}
@ -388,6 +405,14 @@ void TcpServer::TimerOnceCallback(evutil_socket_t, int16_t, void *arg) {
}
}
void TcpServer::SetTcpNoDelay(const evutil_socket_t &fd) {
const int one = 1;
int ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(int));
if (ret < 0) {
MS_LOG(EXCEPTION) << "Set socket no delay failed!";
}
}
bool TcpServer::SendMessage(std::shared_ptr<TcpConnection> conn, std::shared_ptr<CommMessage> message) {
MS_EXCEPTION_IF_NULL(conn);
MS_EXCEPTION_IF_NULL(message);

View File

@ -35,7 +35,7 @@
#include <atomic>
#include "ps/core/tcp_message_handler.h"
#include "ps/core/cluster_config.h"
#include "ps/core/cluster_metadata.h"
#include "utils/convert_utils_base.h"
namespace mindspore {
@ -117,6 +117,7 @@ class TcpServer {
static void EventCallback(struct bufferevent *, std::int16_t events, void *server);
static void TimerCallback(evutil_socket_t fd, int16_t event, void *arg);
static void TimerOnceCallback(evutil_socket_t fd, int16_t event, void *arg);
static void SetTcpNoDelay(const evutil_socket_t &fd);
std::shared_ptr<TcpConnection> onCreateConnection(struct bufferevent *bev, const evutil_socket_t &fd);
struct event_base *base_;

View File

@ -24,7 +24,7 @@
#include <utility>
#include <algorithm>
#include "ps/core/cluster_config.h"
#include "ps/core/cluster_metadata.h"
#include "ps/core/tcp_client.h"
#include "ps/core/tcp_server.h"
#include "ps/core/abstract_node.h"
@ -37,7 +37,7 @@ class WorkerNode : public AbstractNode {
WorkerNode() = default;
~WorkerNode() override;
bool Start(const uint32_t &timeout = ClusterConfig::cluster_available_timeout()) override;
bool Start(const uint32_t &timeout = ClusterMetadata::instance()->cluster_available_timeout()) override;
bool Stop() override;
bool Finish(const uint32_t &timeout = kTimeoutInSeconds) override;

View File

@ -31,8 +31,8 @@ class TestClusterAvailableTimeout : public UT::Common {
};
TEST_F(TestClusterAvailableTimeout, TestClusterAvailableTimeout) {
ClusterConfig::Init(1, 1, "127.0.0.1", 9999);
ClusterConfig::set_cluster_available_timeout(3);
ClusterMetadata::instance()->Init(1, 1, "127.0.0.1", 9999);
ClusterMetadata::instance()->set_cluster_available_timeout(3);
SchedulerNode node;
node.Start();
node.Finish();

View File

@ -18,27 +18,27 @@
#include <string>
#include "common/common_test.h"
#include "ps/core/cluster_config.h"
#include "ps/core/cluster_metadata.h"
namespace mindspore {
namespace ps {
namespace core {
class TestClusterConfig : public UT::Common {
class TestClusterMetadata : public UT::Common {
public:
TestClusterConfig() = default;
virtual ~TestClusterConfig() = default;
TestClusterMetadata() = default;
virtual ~TestClusterMetadata() = default;
void SetUp() override {}
void TearDown() override {}
};
TEST_F(TestClusterConfig, HeartbeatInterval) {
ClusterConfig::Init(2, 2, "127.0.0.1", 8080);
EXPECT_TRUE(ClusterConfig::heartbeat_interval() == 3);
ClusterConfig::set_heartbeat_interval(100);
EXPECT_TRUE(ClusterConfig::heartbeat_interval() == 100);
EXPECT_STREQ(ClusterConfig::scheduler_host().c_str(), "127.0.0.1");
EXPECT_TRUE(ClusterConfig::scheduler_port() == 8080);
TEST_F(TestClusterMetadata, HeartbeatInterval) {
ClusterMetadata::instance()->Init(2, 2, "127.0.0.1", 8080);
EXPECT_TRUE(ClusterMetadata::instance()->heartbeat_interval() == 3);
ClusterMetadata::instance()->set_heartbeat_interval(100);
EXPECT_TRUE(ClusterMetadata::instance()->heartbeat_interval() == 100);
EXPECT_STREQ(ClusterMetadata::instance()->scheduler_host().c_str(), "127.0.0.1");
EXPECT_TRUE(ClusterMetadata::instance()->scheduler_port() == 8080);
}
} // namespace core
} // namespace ps

View File

@ -53,7 +53,7 @@ TEST_F(TestCommUtil, GetAvailableInterfaceAndIP) {
}
TEST_F(TestCommUtil, ValidateRankId) {
ClusterConfig::Init(3, 2, "127.0.0.1", 9999);
ClusterMetadata::instance()->Init(3, 2, "127.0.0.1", 9999);
EXPECT_TRUE(CommUtil::ValidateRankId(NodeRole::WORKER, 2));
EXPECT_FALSE(CommUtil::ValidateRankId(NodeRole::WORKER, 3));
EXPECT_TRUE(CommUtil::ValidateRankId(NodeRole::SERVER, 1));

View File

@ -62,6 +62,8 @@ class TestHttpClient : public UT::Common {
if (memcpy_s(post_message, len, data, len) != 0) {
MS_LOG(EXCEPTION) << "The memset_s error";
}
MS_LOG(WARNING) << "The path param:" << path_param;
MS_LOG(WARNING) << "The header param:" << header_param;
EXPECT_STREQ(path_param.c_str(), "value1");
EXPECT_STREQ(header_param.c_str(), "headerValue");
EXPECT_STREQ(post_message, "postKey=postValue");

View File

@ -95,9 +95,10 @@ class TestHttpServer : public UT::Common {
if (memcpy_s(post_message, len, data, len) != 0) {
MS_LOG(EXCEPTION) << "The memset_s error";
}
MS_LOG(WARNING) << "The Path param:" << path_param;
MS_LOG(WARNING) << "The header param:" << header_param;
EXPECT_STREQ(path_param.c_str(), "value1");
EXPECT_STREQ(header_param.c_str(), "headerValue");
EXPECT_STREQ(post_param.c_str(), "postValue");
EXPECT_STREQ(post_message, "postKey=postValue");
const std::string rKey("headKey");
@ -127,7 +128,7 @@ class TestHttpServer : public UT::Common {
std::unique_ptr<HttpServer> server_;
};
TEST_F(TestHttpServer, httpGetQequest) {
TEST_F(TestHttpServer, httpGetRequest) {
char buffer[100];
FILE *file;
std::string cmd = "curl -X GET http://127.0.0.1:9999/httpget?key1=value1";