From 66638014d56795a610e801f8f1fcd8fc74c076ac Mon Sep 17 00:00:00 2001 From: ZPaC Date: Thu, 15 Dec 2022 15:31:15 +0800 Subject: [PATCH] Sync step at data prepare phase. --- .../graph_scheduler/actor/data_prepare_actor.cc | 7 +++++++ .../runtime/graph_scheduler/actor/loop_count_actor.cc | 4 ++-- .../runtime/graph_scheduler/actor/rpc/recv_actor.cc | 2 -- .../runtime/graph_scheduler/actor/rpc/rpc_actor.h | 5 ++++- .../runtime/graph_scheduler/actor/rpc/send_actor.cc | 2 +- .../runtime/graph_scheduler/actor/rpc/send_actor.h | 2 +- .../runtime/graph_scheduler/rpc_node_scheduler.cc | 10 ++++++++-- .../ccsrc/runtime/graph_scheduler/rpc_node_scheduler.h | 9 ++++++--- tests/st/frontend_compile_cache/test_compile_cache.py | 2 +- 9 files changed, 30 insertions(+), 13 deletions(-) diff --git a/mindspore/ccsrc/runtime/graph_scheduler/actor/data_prepare_actor.cc b/mindspore/ccsrc/runtime/graph_scheduler/actor/data_prepare_actor.cc index 2004871ceec..d04a47ef4c9 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/actor/data_prepare_actor.cc +++ b/mindspore/ccsrc/runtime/graph_scheduler/actor/data_prepare_actor.cc @@ -28,6 +28,7 @@ #include "include/common/utils/convert_utils.h" #include "distributed/recovery/recovery_context.h" #if defined(__linux__) && defined(WITH_BACKEND) +#include "runtime/graph_scheduler/rpc_node_scheduler.h" #include "runtime/graph_scheduler/embedding_cache_scheduler.h" #endif @@ -421,6 +422,12 @@ void DataPrepareActor::SetInitTensorsIfNeeded(const std::vector> &input_tensors, OpContext *const context, GraphExecutionStrategy real_strategy) { MS_EXCEPTION_IF_NULL(context); + +#if defined(__linux__) && defined(WITH_BACKEND) + // Update rpc actors' status. + RpcActorStatusUpdater::GetInstance().UpdateRpcActorStatus(); +#endif + try { // Preprocess before prepare data for data prepare actor. PreprocessBeforePrepareData(); diff --git a/mindspore/ccsrc/runtime/graph_scheduler/actor/loop_count_actor.cc b/mindspore/ccsrc/runtime/graph_scheduler/actor/loop_count_actor.cc index 24e79dd0553..e5d3fb4224e 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/actor/loop_count_actor.cc +++ b/mindspore/ccsrc/runtime/graph_scheduler/actor/loop_count_actor.cc @@ -108,8 +108,8 @@ void LoopCountActor::SendOutput(OpContext *const context) { } #if defined(__linux__) && defined(WITH_BACKEND) - // Update rpc actors' status. - RpcActorStatusUpdater::GetInstance().UpdateRpcActorStatus(); + // Flush sent data after each step is done. + RpcActorStatusUpdater::GetInstance().FlushRpcData(); #endif // The LoopCountActor exits. diff --git a/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/recv_actor.cc b/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/recv_actor.cc index c18b4672040..f0cfc430f9b 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/recv_actor.cc +++ b/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/recv_actor.cc @@ -41,8 +41,6 @@ void RecvActor::SetOpcontext(OpContext *const op_context) { std::unique_lock lock(context_mtx_); MS_EXCEPTION_IF_NULL(op_context); op_context_ = op_context; - is_context_valid_ = true; - context_cv_.notify_all(); } void RecvActor::ResetOpcontext() { diff --git a/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/rpc_actor.h b/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/rpc_actor.h index 769cfdcd314..65fc5f227d7 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/rpc_actor.h +++ b/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/rpc_actor.h @@ -72,9 +72,12 @@ class RpcActor : public KernelActor { // each sinked loop is done in case rpc actors visit the invalid op context. virtual void ResetOpcontext() {} - // Update rpc actor status, for example update ready status of mux recv actors. + // Update rpc actor status, for example update ready status of recv actors. virtual void UpdateStatus() {} + // Flush data for rpc actor, for example flush sent data for send actors. + virtual void FlushData() {} + // Set the actor route proxy for rpc actors. void set_actor_route_table_proxy(const ActorRouteTableProxyPtr &proxy); diff --git a/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/send_actor.cc b/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/send_actor.cc index 0d729ac913e..60438d7c76c 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/send_actor.cc +++ b/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/send_actor.cc @@ -66,7 +66,7 @@ bool SendActor::ConnectServer() { return true; } -void SendActor::UpdateStatus() { +void SendActor::FlushData() { if (!client_->Flush(server_url_)) { MS_LOG(EXCEPTION) << "Failed to flush client for server " << server_url_; } diff --git a/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/send_actor.h b/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/send_actor.h index a33945575a3..cb41c8aeccc 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/send_actor.h +++ b/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/send_actor.h @@ -46,7 +46,7 @@ class SendActor : public RpcActor { // Lookup peer actors' route and create connection to them. bool ConnectServer(); - void UpdateStatus() override; + void FlushData() override; protected: // Do real send operation in this method. diff --git a/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.cc b/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.cc index 13516275b36..5b77d0b57c1 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.cc +++ b/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.cc @@ -273,15 +273,21 @@ void RpcActorStatusUpdater::set_rpc_actors(const RpcActorSetPtr &rpc_actors) { } void RpcActorStatusUpdater::UpdateRpcActorStatus() const { - // Update status for rpc actors to control their execution orders. + // Update status for recv actors to control their execution orders. if (rpc_actors_.lock() != nullptr) { for (auto &recv_actor : rpc_actors_.lock()->recv_actors_) { MS_EXCEPTION_IF_NULL(recv_actor); recv_actor->UpdateStatus(); } + } +} + +void RpcActorStatusUpdater::FlushRpcData() const { + // Flush data for send actors. + if (rpc_actors_.lock() != nullptr) { for (auto &send_actor : rpc_actors_.lock()->send_actors_) { MS_EXCEPTION_IF_NULL(send_actor); - send_actor->UpdateStatus(); + send_actor->FlushData(); } } } diff --git a/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.h b/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.h index 5a0c653b10e..66821ca80c4 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.h +++ b/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.h @@ -93,9 +93,9 @@ class RpcActorOpContextSetter { }; // This class is used to refresh the state of the rpc actor. For example, the mux recv actor receives requests for -// the service process. Currently, the requests are processed serially. After each request (that is, the execution of an -// actor dag) ends, the state of the Recv actor needs to be refreshed. Make it in the ready state to continue with the -// next request. +// the service process. Currently, the requests are processed serially. Before each request (that is, the execution of +// an actor dag) begins, the state of the Recv actor needs to be refreshed. Make it in the ready state to continue with +// the next request. class RpcActorStatusUpdater { public: static RpcActorStatusUpdater &GetInstance(); @@ -106,6 +106,9 @@ class RpcActorStatusUpdater { // Update rpc actors' status. void UpdateRpcActorStatus() const; + // Sent data should be flushed after each step. + void FlushRpcData() const; + private: RpcActorStatusUpdater() = default; ~RpcActorStatusUpdater() = default; diff --git a/tests/st/frontend_compile_cache/test_compile_cache.py b/tests/st/frontend_compile_cache/test_compile_cache.py index e58b0fb9638..726a00cadfb 100644 --- a/tests/st/frontend_compile_cache/test_compile_cache.py +++ b/tests/st/frontend_compile_cache/test_compile_cache.py @@ -289,7 +289,7 @@ def test_compile_cache_lenet_change_dir(): shutil.rmtree(new_path, ignore_errors=True) -@pytest.mark.level1 +@pytest.mark.level0 @pytest.mark.platform_x86_ascend_training @pytest.mark.platform_arm_ascend_training @pytest.mark.env_onecard