fix core dump issue caused by heartbeat

This commit is contained in:
sabrinasun 2021-08-16 21:41:01 -04:00
parent 9f42da3d7c
commit 07009fbbe8
2 changed files with 19 additions and 7 deletions

View File

@ -79,6 +79,7 @@ Debugger::Debugger()
is_dataset_graph_(false), is_dataset_graph_(false),
partial_memory_(false), partial_memory_(false),
initial_suspend_(true), initial_suspend_(true),
enable_heartbeat_(false),
not_dataset_graph_sum_(0), not_dataset_graph_sum_(0),
version_("") { version_("") {
CheckDebuggerEnabledParam(); CheckDebuggerEnabledParam();
@ -131,6 +132,7 @@ void Debugger::EnableDebugger() {
// reset some of the class members // reset some of the class members
num_step_ = 0; num_step_ = 0;
debugger_enabled_ = false; debugger_enabled_ = false;
enable_heartbeat_ = false;
partial_memory_ = false; partial_memory_ = false;
grpc_client_ = nullptr; grpc_client_ = nullptr;
debug_services_ = nullptr; debug_services_ = nullptr;
@ -188,7 +190,7 @@ void Debugger::EnableDebugger() {
// initialize grpc client // initialize grpc client
grpc_client_ = std::make_unique<GrpcClient>(host, port); grpc_client_ = std::make_unique<GrpcClient>(host, port);
// initialize sending heartbeat // initialize sending heartbeat
heartbeat_thread_ = std::make_unique<std::thread>([&]() { SendHeartbeat(heartbeat_period_second); }); heartbeat_thread_ = std::make_unique<std::thread>([=]() { SendHeartbeat(heartbeat_period_second); });
} }
debug_services_ = std::make_unique<DebugServices>(); debug_services_ = std::make_unique<DebugServices>();
} }
@ -582,7 +584,6 @@ GraphProto Debugger::GetGraphProto(const KernelGraphPtr &graph_ptr) const {
} }
void Debugger::SendHeartbeat(int32_t period) { void Debugger::SendHeartbeat(int32_t period) {
bool heartbeat_enabled = true;
int num_heartbeat_fail = 0; int num_heartbeat_fail = 0;
const int max_num_heartbeat_fail = 5; const int max_num_heartbeat_fail = 5;
const int retry_milliseconds = 500; const int retry_milliseconds = 500;
@ -591,8 +592,8 @@ void Debugger::SendHeartbeat(int32_t period) {
heartbeat.set_message("Debugger is alive"); heartbeat.set_message("Debugger is alive");
heartbeat.set_period(heartbeat_period_second); heartbeat.set_period(heartbeat_period_second);
bool run = CheckDebuggerEnabled() && heartbeat_enabled; SetEnableHeartbeat(CheckDebuggerEnabled());
while (run) { while (enable_heartbeat_) {
EventReply reply = grpc_client_->SendHeartbeat(heartbeat); EventReply reply = grpc_client_->SendHeartbeat(heartbeat);
if (reply.status() != reply.OK) { if (reply.status() != reply.OK) {
@ -600,8 +601,8 @@ void Debugger::SendHeartbeat(int32_t period) {
num_heartbeat_fail++; num_heartbeat_fail++;
if (num_heartbeat_fail >= max_num_heartbeat_fail) { if (num_heartbeat_fail >= max_num_heartbeat_fail) {
MS_LOG(ERROR) << "Maximum number of failure for SendHeartbeat reached : exiting training session."; MS_LOG(ERROR) << "Maximum number of failure for SendHeartbeat reached : exiting training session.";
Exit(); SetEnableHeartbeat(false);
run = false; break;
} else { } else {
MS_LOG(ERROR) << "Number of consecutive SendHeartbeat fail:" << num_heartbeat_fail; MS_LOG(ERROR) << "Number of consecutive SendHeartbeat fail:" << num_heartbeat_fail;
std::this_thread::sleep_for(std::chrono::milliseconds(retry_milliseconds)); std::this_thread::sleep_for(std::chrono::milliseconds(retry_milliseconds));
@ -943,9 +944,15 @@ std::list<TensorProto> Debugger::LoadTensors(const ProtoVector<TensorProto> &ten
} }
return tensor_list; return tensor_list;
} }
void Debugger::Exit() { void Debugger::Exit() {
// clear resource before exit // clear resource before exit
// debugger will notify main thread to exit because main thread can only exit at step boundary // debugger will notify main thread to exit because main thread can only exit at step boundary.
SetEnableHeartbeat(false);
if (heartbeat_thread_ && heartbeat_thread_->joinable()) {
heartbeat_thread_->join();
MS_LOG(INFO) << "Join Heartbeat thread.";
}
pipeline::ExecutorPy::DebugTerminate(true); pipeline::ExecutorPy::DebugTerminate(true);
} }
@ -1136,6 +1143,8 @@ bool GetMiVersionMatched(const EventReply &reply) { return reply.version_matched
bool Debugger::partial_memory() const { return partial_memory_; } bool Debugger::partial_memory() const { return partial_memory_; }
void Debugger::SetEnableHeartbeat(bool enabled) { enable_heartbeat_ = enabled; }
void Debugger::SetCurNode(const std::string &cur_name) { void Debugger::SetCurNode(const std::string &cur_name) {
// access lock for public method // access lock for public method
std::lock_guard<std::mutex> a_lock(access_lock_); std::lock_guard<std::mutex> a_lock(access_lock_);

View File

@ -124,6 +124,8 @@ class Debugger : public std::enable_shared_from_this<Debugger> {
bool partial_memory() const; bool partial_memory() const;
void SetEnableHeartbeat(bool enabled);
void SetCurNode(const std::string &cur_name); void SetCurNode(const std::string &cur_name);
std::string run_level() const; std::string run_level() const;
@ -263,6 +265,7 @@ class Debugger : public std::enable_shared_from_this<Debugger> {
std::mutex access_lock_; std::mutex access_lock_;
// flag to keep track of the very first suspension of debugger // flag to keep track of the very first suspension of debugger
bool initial_suspend_; bool initial_suspend_;
bool enable_heartbeat_;
std::list<GraphProto> graph_proto_list_; std::list<GraphProto> graph_proto_list_;
std::list<KernelGraphPtr> graph_ptr_list_; std::list<KernelGraphPtr> graph_ptr_list_;