!47512 Manually finalize tcp client and server instead of deconstructor

Merge pull request !47512 from ZPaC/clear-rpc-sched
This commit is contained in:
i-robot 2023-01-06 09:35:42 +00:00 committed by Gitee
commit 16272c6e71
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
8 changed files with 51 additions and 0 deletions

View File

@ -34,6 +34,7 @@ RecvActor::~RecvActor() {
} catch (const std::exception &) {
MS_LOG(ERROR) << "Failed to finalize for tcp server in recv actor.";
}
server_ = nullptr;
}
}
@ -102,6 +103,13 @@ bool RecvActor::StartServer() {
return true;
}
void RecvActor::Clear() {
if (server_) {
server_->Finalize();
server_ = nullptr;
}
}
void RecvActor::StopRpcAtException() {
std::unique_lock<std::mutex> lock(context_mtx_);
if (!is_context_valid_) {

View File

@ -62,6 +62,9 @@ class RecvActor : public RpcActor {
// Start recv actor server and register this server address to actor route table in scheduler by proxy.
bool StartServer();
// Finalize tcp server.
void Clear() override;
void StopRpcAtException() override;
protected:

View File

@ -88,6 +88,9 @@ class RpcActor : public KernelActor {
virtual void SetRouteInfo(uint32_t peer_rank, const std::string &peer_role, const std::string &src_node_name,
const std::string &dst_node_name) {}
// Clear resource of rpc actor.
virtual void Clear() {}
/**
* @description: Stop rpc communication to avoid dead lock after exception is thrown.
* @return {void}

View File

@ -29,6 +29,7 @@ SendActor::~SendActor() {
} catch (const std::exception &) {
MS_LOG(ERROR) << "Failed to disconnect and finalize for tcp client in send actor.";
}
client_ = nullptr;
}
}
@ -72,6 +73,14 @@ void SendActor::FlushData() {
}
}
void SendActor::Clear() {
if (client_) {
(void)client_->Disconnect(server_url_);
client_->Finalize();
client_ = nullptr;
}
}
bool SendActor::LaunchKernel(OpContext<DeviceTensor> *const context) {
MS_ERROR_IF_NULL_W_RET_VAL(context, false);
// Set context for later usage in FreeMessage.

View File

@ -46,8 +46,12 @@ class SendActor : public RpcActor {
// Lookup peer actors' route and create connection to them.
bool ConnectServer();
// Flush and wait for sent data to be passed to kernel.
void FlushData() override;
// Finalize tcp client.
void Clear() override;
protected:
// Do real send operation in this method.
bool LaunchKernel(OpContext<DeviceTensor> *const context) override;

View File

@ -335,6 +335,12 @@ void GraphScheduler::Clear(const ActorInfo &actor_info, const std::vector<Kernel
}
void GraphScheduler::Clear() {
#ifdef ENABLE_RPC_ACTOR
if (rpc_node_scheduler_ != nullptr) {
rpc_node_scheduler_->Clear();
}
#endif
// Terminate all actors.
auto actor_manager = ActorMgr::GetActorMgrRef();
MS_EXCEPTION_IF_NULL(actor_manager);

View File

@ -217,6 +217,21 @@ void RpcNodeScheduler::ResetOpcontext(const RpcActorSetPtr &rpc_actors) {
op_context_ = nullptr;
}
void RpcNodeScheduler::Clear() {
if (rpc_actors_ != nullptr) {
MS_LOG(INFO) << "Start finalizing tcp server and client for rpc actors.";
for (auto &recv_actor : rpc_actors_->recv_actors_) {
MS_EXCEPTION_IF_NULL(recv_actor);
recv_actor->Clear();
}
for (auto &send_actor : rpc_actors_->send_actors_) {
MS_EXCEPTION_IF_NULL(send_actor);
send_actor->Clear();
}
MS_LOG(INFO) << "End finalizing tcp server and client for rpc actors.";
}
}
void RpcNodeScheduler::Abort() {
MS_LOG(INFO) << "Start aborting rpc actors.";
MS_ERROR_IF_NULL_WO_RET_VAL(rpc_actors_);

View File

@ -49,6 +49,9 @@ class RpcNodeScheduler {
// Reset op context for rpc actors.
void ResetOpcontext(const RpcActorSetPtr &rpc_actors);
// Clear resource of rpc actors. Finalize tcp clients/servers.
void Clear();
// Abort rpc communication. This is usually called when the cluster exits with exception.
void Abort();