forked from mindspore-Ecosystem/mindspore
fix core dump when run graph and pynative mode together
This commit is contained in:
parent
6b430d7f33
commit
cdf3868701
|
@ -47,12 +47,13 @@ bool IsDeviceQueueDSActor(const AnfNodePtr &node) {
|
|||
bool IsSwitchActor(const AnfNodePtr &node) { return AnfAlgo::CheckPrimitiveType(node, prim::kPrimSwitch); }
|
||||
|
||||
bool IsHostQueueDSActor(const AnfNodePtr &node, const KernelGraphPtr &graph, const TensorPtr &tensor,
|
||||
const std::vector<AnfNodePtr> &host_parameters) {
|
||||
const std::vector<AnfNodePtr> &host_parameters, GraphExecutionStrategy strategy) {
|
||||
MS_EXCEPTION_IF_NULL(node);
|
||||
if (node->isa<Parameter>() && (!AnfAlgo::IsParameterWeight(node->cast<ParameterPtr>()))) {
|
||||
// There is device address in tensor, indicating the input tensor is certain kernel's output,
|
||||
// so it's unnecessary to put the input node to host queue data source actor.
|
||||
if (tensor != nullptr && std::dynamic_pointer_cast<DeviceTensor>(tensor->device_address()) != nullptr) {
|
||||
if (strategy == GraphExecutionStrategy::kStep && tensor != nullptr &&
|
||||
std::dynamic_pointer_cast<DeviceTensor>(tensor->device_address()) != nullptr) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -49,6 +49,11 @@ constexpr int kFailure = 1;
|
|||
return; \
|
||||
}
|
||||
|
||||
enum class GraphExecutionStrategy {
|
||||
kPipeline, // The actor running is triggered only by data.
|
||||
kStep // The actor running need be triggered by control in addition.
|
||||
};
|
||||
|
||||
// Get the max available thread number of system.
|
||||
int64_t GetMaxThreadNum();
|
||||
|
||||
|
@ -57,7 +62,8 @@ bool IsDeviceQueueDSActor(const AnfNodePtr &node);
|
|||
// Host parameters are parameters of root funcgraph, in control flow, only the parameters of the root funcgraph are
|
||||
// in the host data source.
|
||||
bool IsHostQueueDSActor(const AnfNodePtr &node, const KernelGraphPtr &graph = nullptr,
|
||||
const TensorPtr &tensor = nullptr, const std::vector<AnfNodePtr> &host_parameters = {});
|
||||
const TensorPtr &tensor = nullptr, const std::vector<AnfNodePtr> &host_parameters = {},
|
||||
GraphExecutionStrategy strategy = GraphExecutionStrategy::kStep);
|
||||
bool IsKernelActor(const AnfNodePtr &node);
|
||||
bool IsSwitchActor(const AnfNodePtr &node);
|
||||
// The skip kernel doesn't run, it exists in the inplace optimizer.
|
||||
|
|
|
@ -241,6 +241,11 @@ void HostQueueDataSourceActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *cont
|
|||
auto &device_tensor = device_tensors[i];
|
||||
MS_EXCEPTION_IF_NULL(host_tensor);
|
||||
MS_EXCEPTION_IF_NULL(device_tensor);
|
||||
|
||||
if (std::dynamic_pointer_cast<DeviceTensor>(host_tensor->device_address()) != nullptr) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!device_tensor->SyncHostToDevice(trans::GetRuntimePaddingShape(data_nodes_[i], 0),
|
||||
LongToSize(host_tensor->data().nbytes()), host_tensor->data_type(),
|
||||
host_tensor->data_c(), host_tensor->device_info().host_format_)) {
|
||||
|
|
|
@ -285,14 +285,15 @@ void PrepareDataForHostDataSourceActor(const std::unordered_map<AnfNodePtr, size
|
|||
const AnfNodePtr &node, const TensorPtr &tensor,
|
||||
std::vector<TensorPtr> *host_tensors) {
|
||||
MS_EXCEPTION_IF_NULL(tensor);
|
||||
if (std::dynamic_pointer_cast<DeviceTensor>(tensor->device_address()) != nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Fill the host tensors for non weighted parameters.
|
||||
const auto &iter = data_node_position_map.find(node);
|
||||
if (iter != data_node_position_map.end()) {
|
||||
(*host_tensors)[iter->second] = tensor;
|
||||
auto device_address = std::dynamic_pointer_cast<DeviceTensor>(tensor->device_address());
|
||||
if (device_address != nullptr) {
|
||||
AnfAlgo::SetOutputAddr(device_address, 0, node.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
@ -434,7 +435,8 @@ void GraphScheduler::Schedule(const ActorSet *actor_set) {
|
|||
}
|
||||
|
||||
void GraphScheduler::PrepareRun(const ActorSet *actor_set, const GraphCompilerInfo &graph_compiler_info,
|
||||
const std::vector<std::vector<TensorPtr>> &input_tensors) {
|
||||
const std::vector<std::vector<TensorPtr>> &input_tensors,
|
||||
GraphExecutionStrategy strategy) {
|
||||
MS_EXCEPTION_IF_NULL(actor_set);
|
||||
std::vector<TensorPtr> host_tensors;
|
||||
std::string actor_name = actor_set->name_ + "_HostDSActor";
|
||||
|
@ -466,7 +468,8 @@ void GraphScheduler::PrepareRun(const ActorSet *actor_set, const GraphCompilerIn
|
|||
// Prepare the device data for weights.
|
||||
const auto front_node = FetchFrontNodeByBackendNode(input_node, graph);
|
||||
PrepareDataForWeightNode(input_node, front_node, input_tensor, device_context);
|
||||
} else if (IsHostQueueDSActor(input_node, graph, input_tensor, graph_compiler_info.origin_parameters_order_)) {
|
||||
} else if (IsHostQueueDSActor(input_node, graph, input_tensor, graph_compiler_info.origin_parameters_order_,
|
||||
strategy)) {
|
||||
MS_EXCEPTION_IF_NULL(host_data_source_actor);
|
||||
PrepareDataForHostDataSourceActor(host_data_source_actor->data_node_position_map_, input_node, input_tensor,
|
||||
&host_tensors);
|
||||
|
|
|
@ -57,11 +57,6 @@ using GraphOutputPair = std::pair<OpActor<DeviceTensor> *, size_t>;
|
|||
// second element is op arrow between actors.
|
||||
using DataArrowPair = std::pair<AID, DataArrowPtr>;
|
||||
|
||||
enum class GraphExecutionStrategy {
|
||||
kPipeline, // The actor running is triggered only by data.
|
||||
kStep // The actor running need be triggered by control in addition.
|
||||
};
|
||||
|
||||
// The graph compiler info generated by graph compiler is the express of executable graph.
|
||||
// The device context is unified interface of interaction with device of corresponding graph.
|
||||
// The tensors mask is used to distinguish input tensor's type.
|
||||
|
@ -153,7 +148,8 @@ class GraphScheduler {
|
|||
// 2. Prepare the data of host tensor queue(such as non weighted parameters of graph).
|
||||
// 3. Prepare the continuous memory for communication kernel.
|
||||
void PrepareRun(const ActorSet *actor_set, const GraphCompilerInfo &graph_compiler_info,
|
||||
const std::vector<std::vector<TensorPtr>> &input_tensors);
|
||||
const std::vector<std::vector<TensorPtr>> &input_tensors,
|
||||
GraphExecutionStrategy strategy = GraphExecutionStrategy::kPipeline);
|
||||
|
||||
// The processing entry of actors running.
|
||||
bool Run(const ActorSet *actor_set, GraphExecutionStrategy strategy = GraphExecutionStrategy::kPipeline,
|
||||
|
|
|
@ -752,7 +752,8 @@ void MindRTBackend::RunGraph(const ActorInfo &actor_info, OpRunInfo *op_run_info
|
|||
}
|
||||
}
|
||||
|
||||
runtime::GraphScheduler::GetInstance().PrepareRun(actor_set, graph_compiler_info, {tensors_without_value_node});
|
||||
runtime::GraphScheduler::GetInstance().PrepareRun(actor_set, graph_compiler_info, {tensors_without_value_node},
|
||||
runtime::GraphExecutionStrategy::kStep);
|
||||
if (!runtime::GraphScheduler::GetInstance().Run(actor_set, runtime::GraphExecutionStrategy::kStep, input_tensors)) {
|
||||
MS_LOG(EXCEPTION) << "The actor runs failed, actor name: " << actor_set->name_;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue