From 84ae4fce73f2e025f78b80b799662ef2a61aaf39 Mon Sep 17 00:00:00 2001 From: yangwei Date: Sat, 7 May 2022 11:06:02 +0800 Subject: [PATCH] RunGraphAsync --- .../include/transform/graph_ir/graph_runner.h | 2 + .../ccsrc/include/transform/graph_ir/util.h | 9 ++ mindspore/ccsrc/pipeline/jit/pipeline.cc | 2 +- mindspore/ccsrc/pipeline/jit/pipeline_ge.cc | 59 +++++++++--- .../ccsrc/transform/graph_ir/graph_runner.cc | 93 +++++++++++++++++++ mindspore/ccsrc/transform/graph_ir/util.cc | 12 +++ 6 files changed, 162 insertions(+), 15 deletions(-) diff --git a/mindspore/ccsrc/include/transform/graph_ir/graph_runner.h b/mindspore/ccsrc/include/transform/graph_ir/graph_runner.h index d1b6d2d073f..61acf87bfeb 100644 --- a/mindspore/ccsrc/include/transform/graph_ir/graph_runner.h +++ b/mindspore/ccsrc/include/transform/graph_ir/graph_runner.h @@ -51,6 +51,8 @@ class COMMON_EXPORT GraphRunner { ~GraphRunner() { sess_ = nullptr; } Status RunGraph(const RunOptions &options, const std::vector &inputs, std::vector *outputs); Status RunGraph(const RunOptions &options, const std::vector &inputs, std::vector *outputs); + Status RunGraph(const RunOptions &options, const std::vector &inputs, std::vector *outputs, + const std::vector &me_types); static std::shared_ptr NewSession(const SessionOptions &sess_options); private: diff --git a/mindspore/ccsrc/include/transform/graph_ir/util.h b/mindspore/ccsrc/include/transform/graph_ir/util.h index 5416e07702a..72d11784790 100644 --- a/mindspore/ccsrc/include/transform/graph_ir/util.h +++ b/mindspore/ccsrc/include/transform/graph_ir/util.h @@ -105,6 +105,15 @@ class COMMON_EXPORT TransformUtil { * */ static MeTensorPtr ConvertGeTensor(const GeTensorPtr &tensor); + /* + * Parameters: + * tensor: [GeTensor] the data tensor in GE + * me_type: [TypeId] the type of created Me tensor + * Return: + * [MeTensor] the data tensor in ME + * */ + static MeTensorPtr ConvertGeTensor(const GeTensorPtr &tensor, const TypeId &me_type); + /* * Parameters: * tensor: [GeTensor] the data tensor in GE diff --git a/mindspore/ccsrc/pipeline/jit/pipeline.cc b/mindspore/ccsrc/pipeline/jit/pipeline.cc index 94b4c74ea08..6f43a067f6e 100644 --- a/mindspore/ccsrc/pipeline/jit/pipeline.cc +++ b/mindspore/ccsrc/pipeline/jit/pipeline.cc @@ -1539,10 +1539,10 @@ void InitPipeline() { // open tsd before ge initialize auto ms_context = MsContext::GetInstance(); MS_EXCEPTION_IF_NULL(ms_context); + (void)context::InitGe(ms_context); if (!context::OpenTsd(ms_context)) { MS_LOG(EXCEPTION) << "Open tsd failed"; } - (void)context::InitGe(ms_context); } void FinalizeBackend() { diff --git a/mindspore/ccsrc/pipeline/jit/pipeline_ge.cc b/mindspore/ccsrc/pipeline/jit/pipeline_ge.cc index 0c9dad73aa7..8b42b105e68 100644 --- a/mindspore/ccsrc/pipeline/jit/pipeline_ge.cc +++ b/mindspore/ccsrc/pipeline/jit/pipeline_ge.cc @@ -38,6 +38,7 @@ namespace pipeline { using Tensor = mindspore::tensor::Tensor; using MetaTensor = mindspore::tensor::MetaTensor; using TensorOrderMap = std::map>; +using mindspore::abstract::AbstractScalar; using mindspore::abstract::AbstractTensor; using mindspore::abstract::AbstractTuple; using mindspore::abstract::AbstractTuplePtr; @@ -397,14 +398,36 @@ py::object StructureOutput(const AnfNodePtr &output_node, const py::tuple &data, return ExtractGeneralCnodeRet(output_c->abstract(), data, count); } -std::shared_ptr DoExecGraph(const FuncGraphPtr &graph, const std::vector &inputs, - const std::string &phase) { +void GetMeRetDataType(const AbstractBasePtr &cnode_data, std::vector *me_types) { + MS_EXCEPTION_IF_NULL(cnode_data); + + if (cnode_data->isa()) { + TypeId me_type = cnode_data->BuildType()->type_id(); + if (me_type == kObjectTypeTensorType) { + me_type = dyn_cast(cnode_data->BuildType())->element()->type_id(); + me_types->emplace_back(me_type); + } + return; + } + if (cnode_data->isa()) { + TypeId me_type = cnode_data->BuildType()->type_id(); + me_types->emplace_back(me_type); + } + auto abstract_tuple = cnode_data->cast(); + MS_EXCEPTION_IF_NULL(abstract_tuple); + auto elements = abstract_tuple->elements(); + for (size_t i = 0; i < abstract_tuple->size(); ++i) { + GetMeRetDataType(elements[i], me_types); + } +} + +std::shared_ptr DoExecGraphAsync(const FuncGraphPtr &graph, const std::vector &inputs, + const std::string &phase) { std::vector ge_tensors = TransformUtil::ConvertInputTensors(inputs, kOpFormat_NCHW); if (ge_tensors.size() != inputs.size()) { MS_LOG(EXCEPTION) << "Convert me args to ge tensor error."; } - std::vector ge_outputs; transform::RunOptions run_options; run_options.name = phase; auto graph_runner = DfGraphManager::GetInstance().GetGraphRunner(); @@ -412,20 +435,31 @@ std::shared_ptr DoExecGraph(const FuncGraphPtr &graph, const std::ve MS_LOG(EXCEPTION) << "Can not found GraphRunner."; } + AnfNodePtr output_node = graph->get_return()->input(1); + MS_EXCEPTION_IF_NULL(output_node); + std::vector me_types; + auto output_c = output_node->cast()->abstract(); + // get output node data types + GetMeRetDataType(output_c, &me_types); + + std::vector me_outputs; { // Release GIL before calling into (potentially long-running) C++ code py::gil_scoped_release release; MS_LOG(DEBUG) << "Run graph begin, inputs size is: " << inputs.size(); - Status ret = graph_runner->RunGraph(run_options, ge_tensors, &ge_outputs); - MS_LOG(DEBUG) << "Run graph finish, outputs size is: " << ge_outputs.size(); - if (ret != Status::SUCCESS) { - MS_LOG(ERROR) << "Exec graph failed"; - return nullptr; + try { + Status ret = graph_runner->RunGraph(run_options, ge_tensors, &me_outputs, me_types); + MS_LOG(DEBUG) << "Run graph finish, outputs size is: " << me_outputs.size(); + if (ret != Status::SUCCESS) { + MS_LOG(ERROR) << "Exec graph failed"; + return nullptr; + } + } catch (const std::exception &ex) { + throw(ex); } } - std::vector me_outputs = TransformUtil::ConvertGeTensors(ge_outputs); - if (me_outputs.size() != ge_outputs.size()) { + if (me_outputs.size() != me_types.size()) { MS_LOG(WARNING) << "Convert output Ge tensor to Me tensor failed"; } @@ -435,9 +469,6 @@ std::shared_ptr DoExecGraph(const FuncGraphPtr &graph, const std::ve } std::shared_ptr ret = nullptr; - - AnfNodePtr output_node = graph->get_return()->input(1); - MS_EXCEPTION_IF_NULL(output_node); size_t count = 0; py::object oj = StructureOutput(output_node, outputs, &count); ret = std::make_shared(oj); @@ -501,7 +532,7 @@ py::object ExecDFGraph(const std::map &info, const std::vector inputs; ProcessGeArg(info, args, phase, &inputs); - std::shared_ptr ret = DoExecGraph(anf_graph, inputs, phase); + std::shared_ptr ret = DoExecGraphAsync(anf_graph, inputs, phase); ConfigManager::GetInstance().ResetConfig(); if (ret != nullptr) { return *ret; diff --git a/mindspore/ccsrc/transform/graph_ir/graph_runner.cc b/mindspore/ccsrc/transform/graph_ir/graph_runner.cc index 982fedc37d0..9cdc8b202e3 100644 --- a/mindspore/ccsrc/transform/graph_ir/graph_runner.cc +++ b/mindspore/ccsrc/transform/graph_ir/graph_runner.cc @@ -29,6 +29,7 @@ #include "include/common/utils/callbacks.h" #ifdef ENABLE_D #include "include/common/utils/callbacks_ge.h" +#include "common/ge_inner_error_codes.h" #endif #include "utils/ms_context.h" @@ -169,6 +170,98 @@ Status GraphRunner::RunGraph(const RunOptions &options, const std::vector &inputs, + std::vector *outputs, const std::vector &me_types) { + std::string name = options.name; + if (name.empty()) { + MS_LOG(ERROR) << "The graph name is null"; + return Status::INVALID_ARGUMENT; + } + + DfGraphWrapperPtr wrap_ptr = graph_manager_.GetGraphByName(name); + if (wrap_ptr == nullptr) { + MS_LOG(ERROR) << "Get graph form DfGraphManager failed!"; + return Status::NOT_FOUND; + } + + if (wrap_ptr->graph_ptr_ == nullptr) { + MS_LOG(WARNING) << "The graph is null"; + return Status::NOT_FOUND; + } + + // call ge::RunGraphAsync() to exec a graph; + std::vector ge_inputs; + + (void)std::transform(inputs.begin(), inputs.end(), std::back_inserter(ge_inputs), + [](const GeTensorPtr &i) { return *i; }); + + MS_LOG(INFO) << "Run the graph in GE with " << ge_inputs.size() << " inputs"; + + struct timeval start_time, end_time; + (void)gettimeofday(&start_time, nullptr); + +#ifdef ENABLE_D + std::mutex mutex; + std::condition_variable condition; + bool is_finished = false; + bool end_of_sequence = false; + std::unique_lock lock(mutex); + auto call_back = [=, &is_finished, &end_of_sequence, &condition](ge::Status ge_status, + std::vector &ge_outputs) { + if (ge_status == Status::SUCCESS) { + if (me_types.size() != ge_outputs.size()) { + MS_LOG(EXCEPTION) << "Convert output ge tensor to me tensor failed."; + } + for (size_t i = 0; i < ge_outputs.size(); ++i) { + std::shared_ptr ge_tensor_ptr = std::make_shared(ge_outputs[i]); + auto me_tensor = TransformUtil::ConvertGeTensor(ge_tensor_ptr, me_types[i]); + if (me_tensor != nullptr) { + outputs->emplace_back(me_tensor); + } + } + is_finished = true; + } else if (ge_status == ge::END_OF_SEQUENCE) { + MS_LOG(WARNING) << "RunAsync out of range: End of sequence."; + end_of_sequence = true; + } else { + MS_LOG(ERROR) << "RunAsync failed."; + } + condition.notify_all(); + return; + }; + auto ms_context = MsContext::GetInstance(); + MS_EXCEPTION_IF_NULL(ms_context); + if (ms_context->backend_policy() == "ge") { + if (sess_ == nullptr) { + MS_LOG(ERROR) << "The GE session is null, can't run the graph!"; + return Status::FAILED; + } + ge::Status ret = sess_->RunGraphAsync(static_cast(wrap_ptr->id_), ge_inputs, call_back); + if (ret != ge::GRAPH_SUCCESS) { + MS_LOG(ERROR) << "Call GE RunGraphAsync Failed, ret is: " << ret; + return Status::FAILED; + } + if (!is_finished) { + condition.wait(lock); + } + if (end_of_sequence) { + throw(std::runtime_error("End of sequence.")); + } + if (!is_finished) { + MS_LOG(ERROR) << "Call GE RunGraphAsync failed."; + return Status::FAILED; + } + } +#endif + (void)gettimeofday(&end_time, nullptr); + const uint64_t kUSecondInSecond = 1000000; + uint64_t cost = kUSecondInSecond * static_cast(end_time.tv_sec - start_time.tv_sec); + cost += static_cast(end_time.tv_usec - start_time.tv_usec); + MS_LOG(INFO) << "Call GE RunGraph Success in " << cost << " us, the GE outputs num is: " << outputs->size(); + + return Status::SUCCESS; +} + Status GraphRunner::RunGraph(const RunOptions &options, const std::vector &inputs, std::vector *const outputs) { std::vector ge_inputs; diff --git a/mindspore/ccsrc/transform/graph_ir/util.cc b/mindspore/ccsrc/transform/graph_ir/util.cc index fd772bb6f0b..d8a2e37f815 100644 --- a/mindspore/ccsrc/transform/graph_ir/util.cc +++ b/mindspore/ccsrc/transform/graph_ir/util.cc @@ -384,6 +384,18 @@ MeTensorPtr TransformUtil::ConvertGeTensor(const GeTensorPtr &ge_tensor) { return GenerateMeTensor(ge_tensor, me_dims, type_id); } +MeTensorPtr TransformUtil::ConvertGeTensor(const GeTensorPtr &ge_tensor, const TypeId &me_type) { + MS_EXCEPTION_IF_NULL(ge_tensor); + GeShape ge_shape = ge_tensor->GetTensorDesc().GetShape(); + vector me_dims = ConvertGeShape(ge_shape); + + if (me_type == MeDataType::kTypeUnknown) { + MS_LOG(ERROR) << "Unsupported data type: " << static_cast(me_type); + return nullptr; + } + return GenerateMeTensor(ge_tensor, me_dims, me_type); +} + // if request_dims is empty, use ge tensor's shape,otherwise convert to request shape MeTensorPtr TransformUtil::ConvertGeTensor(const GeTensorPtr ge_tensor, const ShapeVector &request_dims) { MS_EXCEPTION_IF_NULL(ge_tensor);