!33991 [GeAdptor]Ge RunGraphAsync

Merge pull request !33991 from weiyang/rungraphasync
This commit is contained in:
i-robot 2022-05-11 01:58:48 +00:00 committed by Gitee
commit e195cba9a6
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
6 changed files with 162 additions and 15 deletions

View File

@ -51,6 +51,8 @@ class COMMON_EXPORT GraphRunner {
~GraphRunner() { sess_ = nullptr; }
Status RunGraph(const RunOptions &options, const std::vector<MeTensorPtr> &inputs, std::vector<MeTensorPtr> *outputs);
Status RunGraph(const RunOptions &options, const std::vector<GeTensorPtr> &inputs, std::vector<GeTensorPtr> *outputs);
Status RunGraph(const RunOptions &options, const std::vector<GeTensorPtr> &inputs, std::vector<MeTensorPtr> *outputs,
const std::vector<TypeId> &me_types);
static std::shared_ptr<ge::Session> NewSession(const SessionOptions &sess_options);

View File

@ -105,6 +105,15 @@ class COMMON_EXPORT TransformUtil {
* */
static MeTensorPtr ConvertGeTensor(const GeTensorPtr &tensor);
* Parameters:
* tensor: [GeTensor] the data tensor in GE
* me_type: [TypeId] the type of created Me tensor
* Return
* [MeTensor] the data tensor in ME
* */
static MeTensorPtr ConvertGeTensor(const GeTensorPtr &tensor, const TypeId &me_type);
* Parameters:
* tensor: [GeTensor] the data tensor in GE

View File

@ -1539,10 +1539,10 @@ void InitPipeline() {
// open tsd before ge initialize
auto ms_context = MsContext::GetInstance();
if (!context::OpenTsd(ms_context)) {
MS_LOG(EXCEPTION) << "Open tsd failed";
void FinalizeBackend() {

View File

@ -38,6 +38,7 @@ namespace pipeline {
using Tensor = mindspore::tensor::Tensor;
using MetaTensor = mindspore::tensor::MetaTensor;
using TensorOrderMap = std::map<std::string, std::shared_ptr<Tensor>>;
using mindspore::abstract::AbstractScalar;
using mindspore::abstract::AbstractTensor;
using mindspore::abstract::AbstractTuple;
using mindspore::abstract::AbstractTuplePtr;
@ -397,14 +398,36 @@ py::object StructureOutput(const AnfNodePtr &output_node, const py::tuple &data,
return ExtractGeneralCnodeRet(output_c->abstract(), data, count);
std::shared_ptr<py::object> DoExecGraph(const FuncGraphPtr &graph, const std::vector<MeTensorPtr> &inputs,
void GetMeRetDataType(const AbstractBasePtr &cnode_data, std::vector<TypeId> *me_types) {
if (cnode_data->isa<AbstractTensor>()) {
TypeId me_type = cnode_data->BuildType()->type_id();
if (me_type == kObjectTypeTensorType) {
me_type = dyn_cast<TensorType>(cnode_data->BuildType())->element()->type_id();
if (cnode_data->isa<AbstractScalar>()) {
TypeId me_type = cnode_data->BuildType()->type_id();
auto abstract_tuple = cnode_data->cast<AbstractTuplePtr>();
auto elements = abstract_tuple->elements();
for (size_t i = 0; i < abstract_tuple->size(); ++i) {
GetMeRetDataType(elements[i], me_types);
std::shared_ptr<py::object> DoExecGraphAsync(const FuncGraphPtr &graph, const std::vector<MeTensorPtr> &inputs,
const std::string &phase) {
std::vector<GeTensorPtr> ge_tensors = TransformUtil::ConvertInputTensors(inputs, kOpFormat_NCHW);
if (ge_tensors.size() != inputs.size()) {
MS_LOG(EXCEPTION) << "Convert me args to ge tensor error.";
std::vector<GeTensorPtr> ge_outputs;
transform::RunOptions run_options;
run_options.name = phase;
auto graph_runner = DfGraphManager::GetInstance().GetGraphRunner();
@ -412,20 +435,31 @@ std::shared_ptr<py::object> DoExecGraph(const FuncGraphPtr &graph, const std::ve
MS_LOG(EXCEPTION) << "Can not found GraphRunner.";
AnfNodePtr output_node = graph->get_return()->input(1);
std::vector<TypeId> me_types;
auto output_c = output_node->cast<CNodePtr>()->abstract();
// get output node data types
GetMeRetDataType(output_c, &me_types);
std::vector<MeTensorPtr> me_outputs;
// Release GIL before calling into (potentially long-running) C++ code
py::gil_scoped_release release;
MS_LOG(DEBUG) << "Run graph begin, inputs size is: " << inputs.size();
Status ret = graph_runner->RunGraph(run_options, ge_tensors, &ge_outputs);
MS_LOG(DEBUG) << "Run graph finish, outputs size is: " << ge_outputs.size();
try {
Status ret = graph_runner->RunGraph(run_options, ge_tensors, &me_outputs, me_types);
MS_LOG(DEBUG) << "Run graph finish, outputs size is: " << me_outputs.size();
if (ret != Status::SUCCESS) {
MS_LOG(ERROR) << "Exec graph failed";
return nullptr;
} catch (const std::exception &ex) {
std::vector<MeTensorPtr> me_outputs = TransformUtil::ConvertGeTensors(ge_outputs);
if (me_outputs.size() != ge_outputs.size()) {
if (me_outputs.size() != me_types.size()) {
MS_LOG(WARNING) << "Convert output Ge tensor to Me tensor failed";
@ -435,9 +469,6 @@ std::shared_ptr<py::object> DoExecGraph(const FuncGraphPtr &graph, const std::ve
std::shared_ptr<py::object> ret = nullptr;
AnfNodePtr output_node = graph->get_return()->input(1);
size_t count = 0;
py::object oj = StructureOutput(output_node, outputs, &count);
ret = std::make_shared<py::object>(oj);
@ -501,7 +532,7 @@ py::object ExecDFGraph(const std::map<std::string, ExecutorInfoPtr> &info, const
std::vector<tensor::TensorPtr> inputs;
ProcessGeArg(info, args, phase, &inputs);
std::shared_ptr<py::object> ret = DoExecGraph(anf_graph, inputs, phase);
std::shared_ptr<py::object> ret = DoExecGraphAsync(anf_graph, inputs, phase);
if (ret != nullptr) {
return *ret;

View File

@ -29,6 +29,7 @@
#include "include/common/utils/callbacks.h"
#ifdef ENABLE_D
#include "include/common/utils/callbacks_ge.h"
#include "common/ge_inner_error_codes.h"
#include "utils/ms_context.h"
@ -169,6 +170,98 @@ Status GraphRunner::RunGraph(const RunOptions &options, const std::vector<GeTens
return Status::SUCCESS;
Status GraphRunner::RunGraph(const RunOptions &options, const std::vector<GeTensorPtr> &inputs,
std::vector<MeTensorPtr> *outputs, const std::vector<TypeId> &me_types) {
std::string name = options.name;
if (name.empty()) {
MS_LOG(ERROR) << "The graph name is null";
return Status::INVALID_ARGUMENT;
DfGraphWrapperPtr wrap_ptr = graph_manager_.GetGraphByName(name);
if (wrap_ptr == nullptr) {
MS_LOG(ERROR) << "Get graph form DfGraphManager failed!";
return Status::NOT_FOUND;
if (wrap_ptr->graph_ptr_ == nullptr) {
MS_LOG(WARNING) << "The graph is null";
return Status::NOT_FOUND;
// call ge::RunGraphAsync() to exec a graph;
std::vector<GeTensor> ge_inputs;
(void)std::transform(inputs.begin(), inputs.end(), std::back_inserter(ge_inputs),
[](const GeTensorPtr &i) { return *i; });
MS_LOG(INFO) << "Run the graph in GE with " << ge_inputs.size() << " inputs";
struct timeval start_time, end_time;
(void)gettimeofday(&start_time, nullptr);
#ifdef ENABLE_D
std::mutex mutex;
std::condition_variable condition;
bool is_finished = false;
bool end_of_sequence = false;
std::unique_lock<std::mutex> lock(mutex);
auto call_back = [=, &is_finished, &end_of_sequence, &condition](ge::Status ge_status,
std::vector<ge::Tensor> &ge_outputs) {
if (ge_status == Status::SUCCESS) {
if (me_types.size() != ge_outputs.size()) {
MS_LOG(EXCEPTION) << "Convert output ge tensor to me tensor failed.";
for (size_t i = 0; i < ge_outputs.size(); ++i) {
std::shared_ptr<ge::Tensor> ge_tensor_ptr = std::make_shared<ge::Tensor>(ge_outputs[i]);
auto me_tensor = TransformUtil::ConvertGeTensor(ge_tensor_ptr, me_types[i]);
if (me_tensor != nullptr) {
is_finished = true;
} else if (ge_status == ge::END_OF_SEQUENCE) {
MS_LOG(WARNING) << "RunAsync out of range: End of sequence.";
end_of_sequence = true;
} else {
MS_LOG(ERROR) << "RunAsync failed.";
auto ms_context = MsContext::GetInstance();
if (ms_context->backend_policy() == "ge") {
if (sess_ == nullptr) {
MS_LOG(ERROR) << "The GE session is null, can't run the graph!";
return Status::FAILED;
ge::Status ret = sess_->RunGraphAsync(static_cast<uint32_t>(wrap_ptr->id_), ge_inputs, call_back);
if (ret != ge::GRAPH_SUCCESS) {
MS_LOG(ERROR) << "Call GE RunGraphAsync Failed, ret is: " << ret;
return Status::FAILED;
if (!is_finished) {
if (end_of_sequence) {
throw(std::runtime_error("End of sequence."));
if (!is_finished) {
MS_LOG(ERROR) << "Call GE RunGraphAsync failed.";
return Status::FAILED;
(void)gettimeofday(&end_time, nullptr);
const uint64_t kUSecondInSecond = 1000000;
uint64_t cost = kUSecondInSecond * static_cast<uint64_t>(end_time.tv_sec - start_time.tv_sec);
cost += static_cast<uint64_t>(end_time.tv_usec - start_time.tv_usec);
MS_LOG(INFO) << "Call GE RunGraph Success in " << cost << " us, the GE outputs num is: " << outputs->size();
return Status::SUCCESS;
Status GraphRunner::RunGraph(const RunOptions &options, const std::vector<MeTensorPtr> &inputs,
std::vector<MeTensorPtr> *const outputs) {
std::vector<GeTensorPtr> ge_inputs;

View File

@ -384,6 +384,18 @@ MeTensorPtr TransformUtil::ConvertGeTensor(const GeTensorPtr &ge_tensor) {
return GenerateMeTensor(ge_tensor, me_dims, type_id);
MeTensorPtr TransformUtil::ConvertGeTensor(const GeTensorPtr &ge_tensor, const TypeId &me_type) {
GeShape ge_shape = ge_tensor->GetTensorDesc().GetShape();
vector<int64_t> me_dims = ConvertGeShape(ge_shape);
if (me_type == MeDataType::kTypeUnknown) {
MS_LOG(ERROR) << "Unsupported data type: " << static_cast<int>(me_type);
return nullptr;
return GenerateMeTensor(ge_tensor, me_dims, me_type);
// if request_dims is empty, use ge tensor's shape,otherwise convert to request shape
MeTensorPtr TransformUtil::ConvertGeTensor(const GeTensorPtr ge_tensor, const ShapeVector &request_dims) {