Sync step at data prepare phase.

This commit is contained in:
ZPaC 2022-12-15 15:31:15 +08:00
parent 9ef3ac407e
commit 66638014d5
9 changed files with 30 additions and 13 deletions

View File

@ -28,6 +28,7 @@
#include "include/common/utils/convert_utils.h"
#include "distributed/recovery/recovery_context.h"
#if defined(__linux__) && defined(WITH_BACKEND)
#include "runtime/graph_scheduler/rpc_node_scheduler.h"
#include "runtime/graph_scheduler/embedding_cache_scheduler.h"
#endif
@ -421,6 +422,12 @@ void DataPrepareActor::SetInitTensorsIfNeeded(const std::vector<std::vector<Tens
void DataPrepareActor::PrepareData(const std::vector<std::vector<TensorPtr>> &input_tensors,
OpContext<DeviceTensor> *const context, GraphExecutionStrategy real_strategy) {
MS_EXCEPTION_IF_NULL(context);
#if defined(__linux__) && defined(WITH_BACKEND)
// Update rpc actors' status.
RpcActorStatusUpdater::GetInstance().UpdateRpcActorStatus();
#endif
try {
// Preprocess before prepare data for data prepare actor.
PreprocessBeforePrepareData();

View File

@ -108,8 +108,8 @@ void LoopCountActor::SendOutput(OpContext<DeviceTensor> *const context) {
}
#if defined(__linux__) && defined(WITH_BACKEND)
// Update rpc actors' status.
RpcActorStatusUpdater::GetInstance().UpdateRpcActorStatus();
// Flush sent data after each step is done.
RpcActorStatusUpdater::GetInstance().FlushRpcData();
#endif
// The LoopCountActor exits.

View File

@ -41,8 +41,6 @@ void RecvActor::SetOpcontext(OpContext<DeviceTensor> *const op_context) {
std::unique_lock<std::mutex> lock(context_mtx_);
MS_EXCEPTION_IF_NULL(op_context);
op_context_ = op_context;
is_context_valid_ = true;
context_cv_.notify_all();
}
void RecvActor::ResetOpcontext() {

View File

@ -72,9 +72,12 @@ class RpcActor : public KernelActor {
// each sinked loop is done in case rpc actors visit the invalid op context.
virtual void ResetOpcontext() {}
// Update rpc actor status, for example update ready status of mux recv actors.
// Update rpc actor status, for example update ready status of recv actors.
virtual void UpdateStatus() {}
// Flush data for rpc actor, for example flush sent data for send actors.
virtual void FlushData() {}
// Set the actor route proxy for rpc actors.
void set_actor_route_table_proxy(const ActorRouteTableProxyPtr &proxy);

View File

@ -66,7 +66,7 @@ bool SendActor::ConnectServer() {
return true;
}
void SendActor::UpdateStatus() {
void SendActor::FlushData() {
if (!client_->Flush(server_url_)) {
MS_LOG(EXCEPTION) << "Failed to flush client for server " << server_url_;
}

View File

@ -46,7 +46,7 @@ class SendActor : public RpcActor {
// Lookup peer actors' route and create connection to them.
bool ConnectServer();
void UpdateStatus() override;
void FlushData() override;
protected:
// Do real send operation in this method.

View File

@ -273,15 +273,21 @@ void RpcActorStatusUpdater::set_rpc_actors(const RpcActorSetPtr &rpc_actors) {
}
void RpcActorStatusUpdater::UpdateRpcActorStatus() const {
// Update status for rpc actors to control their execution orders.
// Update status for recv actors to control their execution orders.
if (rpc_actors_.lock() != nullptr) {
for (auto &recv_actor : rpc_actors_.lock()->recv_actors_) {
MS_EXCEPTION_IF_NULL(recv_actor);
recv_actor->UpdateStatus();
}
}
}
void RpcActorStatusUpdater::FlushRpcData() const {
// Flush data for send actors.
if (rpc_actors_.lock() != nullptr) {
for (auto &send_actor : rpc_actors_.lock()->send_actors_) {
MS_EXCEPTION_IF_NULL(send_actor);
send_actor->UpdateStatus();
send_actor->FlushData();
}
}
}

View File

@ -93,9 +93,9 @@ class RpcActorOpContextSetter {
};
// This class is used to refresh the state of the rpc actor. For example, the mux recv actor receives requests for
// the service process. Currently, the requests are processed serially. After each request (that is, the execution of an
// actor dag) ends, the state of the Recv actor needs to be refreshed. Make it in the ready state to continue with the
// next request.
// the service process. Currently, the requests are processed serially. Before each request (that is, the execution of
// an actor dag) begins, the state of the Recv actor needs to be refreshed. Make it in the ready state to continue with
// the next request.
class RpcActorStatusUpdater {
public:
static RpcActorStatusUpdater &GetInstance();
@ -106,6 +106,9 @@ class RpcActorStatusUpdater {
// Update rpc actors' status.
void UpdateRpcActorStatus() const;
// Sent data should be flushed after each step.
void FlushRpcData() const;
private:
RpcActorStatusUpdater() = default;
~RpcActorStatusUpdater() = default;

View File

@ -289,7 +289,7 @@ def test_compile_cache_lenet_change_dir():
shutil.rmtree(new_path, ignore_errors=True)
@pytest.mark.level1
@pytest.mark.level0
@pytest.mark.platform_x86_ascend_training
@pytest.mark.platform_arm_ascend_training
@pytest.mark.env_onecard