diff --git a/mindspore/ccsrc/device/ascend/ascend_kernel_runtime.cc b/mindspore/ccsrc/device/ascend/ascend_kernel_runtime.cc index 5851e885b40..fb2a3f350b0 100644 --- a/mindspore/ccsrc/device/ascend/ascend_kernel_runtime.cc +++ b/mindspore/ccsrc/device/ascend/ascend_kernel_runtime.cc @@ -327,19 +327,16 @@ bool AscendKernelRuntime::GenTask(const session::KernelGraph *graph) { vector> task_info_list; auto anf_node_list = graph->execution_order(); TaskGenerator::GenTasks(anf_node_list, &task_info_list, graph->graph_id()); - // Store the task_info_list auto insert_ret = task_map_.insert(std::make_pair(graph->graph_id(), task_info_list)); if (!insert_ret.second) { MS_LOG(EXCEPTION) << "Duplicate GraphId! Please check in ascend_session."; } - // Graph may have no compute node, such TensorAddGrad. if (task_info_list.empty()) { MS_LOG(WARNING) << "graph " << graph->graph_id() << " have no compute node"; return true; } - AscendStreamAssign &assign_instance = AscendStreamAssign::GetInstance(); AscendStreamMng &stream_manager = AscendStreamMng::GetInstance(); AscendLabelAssign &label_assign_instance = AscendLabelAssign::GetInstance(); @@ -348,19 +345,16 @@ bool AscendKernelRuntime::GenTask(const session::KernelGraph *graph) { assign_instance.GetWaitStreams(&wait_active_stream_list); std::vector force_copy_stream_list; assign_instance.GetHcomStreams(&force_copy_stream_list); - MS_LOG(INFO) << "call DavinciModel total stream num:" << stream_manager.GetCurAllocStreamNum() << ", total event num:" << assign_instance.total_event_num() << ", total label num:" << label_assign_instance.GetLabelNum(NOT_NULL(graph)) << ", wait_active_stream_list size:" << wait_active_stream_list.size() << ", force_copy_stream_list size:" << force_copy_stream_list.size(); - std::vector> empty_list; std::shared_ptr model = std::make_shared( task_info_list, empty_list, empty_list, empty_list, empty_list, wait_active_stream_list, force_copy_stream_list, 0, 0, 0, 0, 0, 0, stream_manager.GetCurAllocStreamNum(), label_assign_instance.GetLabelNum(NOT_NULL(graph)), assign_instance.total_event_num(), 0); - auto ret = graph_model_map_.insert(std::make_pair(graph->graph_id(), model)); if (!ret.second) { MS_LOG(EXCEPTION) << "Duplicate GraphId! Please check in ascend_session."; diff --git a/mindspore/ccsrc/device/cpu/cpu_kernel_runtime.cc b/mindspore/ccsrc/device/cpu/cpu_kernel_runtime.cc index 7ef1cb31571..6725dff5247 100644 --- a/mindspore/ccsrc/device/cpu/cpu_kernel_runtime.cc +++ b/mindspore/ccsrc/device/cpu/cpu_kernel_runtime.cc @@ -147,20 +147,18 @@ BaseRef CPUKernelRuntime::CreatTensorForOutput(const session::KernelWithIndex &k auto &input_node = kernel_with_index.first; auto index = kernel_with_index.second; MS_EXCEPTION_IF_NULL(input_node); - if (input_node->isa() && AnfAlgo::GetCNodeName(input_node) == prim::kPrimMakeTuple->name()) { - auto cnode = input_node->cast(); - MS_EXCEPTION_IF_NULL(cnode); - VectorRef ret; - for (size_t i = 1; i < cnode->inputs().size(); i++) { - auto item_with_index = AnfAlgo::VisitKernelWithReturnType(cnode->input(i), 0); - auto out = CreatTensorForOutput(item_with_index, input_map, bound_addresses, need_sync_outputs); - ret.push_back(out); - } - return ret; - } if (input_node->isa()) { auto node = input_node->cast(); MS_EXCEPTION_IF_NULL(node); + if (AnfAlgo::GetCNodeName(input_node) == prim::kPrimMakeTuple->name()) { + VectorRef ret; + for (size_t i = 1; i < node->inputs().size(); i++) { + auto item_with_index = AnfAlgo::VisitKernelWithReturnType(node->input(i), 0); + auto out = CreatTensorForOutput(item_with_index, input_map, bound_addresses, need_sync_outputs); + ret.push_back(out); + } + return ret; + } size_t output_size = AnfAlgo::GetOutputTensorNum(node); if (index >= output_size) { MS_LOG(EXCEPTION) << "Invalid input index " << index; diff --git a/mindspore/ccsrc/kernel/common_utils.cc b/mindspore/ccsrc/kernel/common_utils.cc index 9b578eb104a..34a12315424 100644 --- a/mindspore/ccsrc/kernel/common_utils.cc +++ b/mindspore/ccsrc/kernel/common_utils.cc @@ -577,6 +577,52 @@ void DeduplicateIndexedSlices(const SparseGradient &origin_sparse_grad, SparseGr unique_grad->indices_size_ = unique_indices_size; } +struct WorkerParamsForReduceSparseGradient { + size_t slice_start_{0}; + size_t slice_end_{0}; + size_t max_length_{0}; + size_t outer_dim_{0}; + std::vector> *sorted_indices_{nullptr}; + std::vector *slice_positions_{nullptr}; + float *src_value_{nullptr}; + SparseGradient *unique_grad_{nullptr}; +}; + +void WorkerForReduceSparseGradient(WorkerParamsForReduceSparseGradient param) { + MS_EXCEPTION_IF_NULL(param.sorted_indices_); + MS_EXCEPTION_IF_NULL(param.slice_positions_); + MS_EXCEPTION_IF_NULL(param.src_value_); + MS_EXCEPTION_IF_NULL(param.unique_grad_); + auto outer_dim = param.outer_dim_; + auto &sorted_indices = *(param.sorted_indices_); + auto &slice_positions = *(param.slice_positions_); + auto unique_grad = param.unique_grad_; + for (size_t slice_id = param.slice_start_; slice_id < param.slice_end_; ++slice_id) { + size_t cur_pos = slice_positions[slice_id]; + int index = sorted_indices[cur_pos].first; + unique_grad->indices_[slice_id] = index; + size_t start_index = slice_id * outer_dim; + auto ret_code = memcpy_s(unique_grad->value_ + start_index, (param.max_length_ - start_index) * sizeof(float), + param.src_value_ + sorted_indices[cur_pos].second, outer_dim * sizeof(float)); + if (ret_code != EOK) { + MS_LOG(EXCEPTION) << "Failed to copy data!"; + } + cur_pos++; + size_t end_pos; + if (slice_id + 1 < slice_positions.size()) { + end_pos = slice_positions[slice_id + 1]; + } else { + end_pos = sorted_indices.size(); + } + while (cur_pos < end_pos) { + for (size_t i = 0; i < outer_dim; ++i) { + unique_grad->value_[start_index + i] += param.src_value_[sorted_indices[cur_pos].second + i]; + } + cur_pos++; + } + } +} + void ReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad, size_t first_dim, size_t outer_dim) { MS_EXCEPTION_IF_NULL(origin_sparse_grad.value_); @@ -584,47 +630,50 @@ void ReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradie MS_EXCEPTION_IF_NULL(unique_grad); MS_EXCEPTION_IF_NULL(unique_grad->value_); MS_EXCEPTION_IF_NULL(unique_grad->indices_); - size_t unique_indices_size = 0; std::vector> sorted_indices; sorted_indices.reserve(origin_sparse_grad.indices_size_); for (size_t i = 0; i < origin_sparse_grad.indices_size_; ++i) { int index = origin_sparse_grad.indices_[i]; - if (index < 0 || IntToSize(index) >= first_dim) { - continue; + if (index >= 0 && IntToSize(index) < first_dim) { + sorted_indices.emplace_back(std::pair(index, i * outer_dim)); } - sorted_indices.emplace_back(std::pair(index, i * outer_dim)); } std::sort( sorted_indices.begin(), sorted_indices.end(), [](const std::pair &left, const std::pair &right) { return left.first < right.first; }); - int last_index = 0; - size_t indices_size = sorted_indices.size(); - size_t start_index = 0; - size_t end_index = outer_dim; - size_t dst_len = indices_size * outer_dim; - for (size_t i = 0; i < indices_size; ++i) { - int index = sorted_indices[i].first; - if (i == 0 || last_index != index) { - if (i > 0 && last_index != index) { - unique_indices_size++; - start_index += outer_dim; - end_index += outer_dim; - } - unique_grad->indices_[unique_indices_size] = index; - auto ret_code = memcpy_s(unique_grad->value_ + start_index, dst_len - start_index, - origin_sparse_grad.value_ + sorted_indices[i].second, outer_dim); - if (ret_code != EOK) { - MS_LOG(EXCEPTION) << "Failed to copy data!"; - } - } else { - for (size_t j = start_index, k = sorted_indices[i].second; j < end_index; ++j, ++k) { - unique_grad->value_[j] += origin_sparse_grad.value_[k]; - } + std::vector slice_positions; + for (size_t i = 0; i < sorted_indices.size(); ++i) { + if (i == 0 || last_index != sorted_indices[i].first) { + slice_positions.emplace_back(i); } - last_index = index; + last_index = sorted_indices[i].first; } - unique_grad->indices_size_ = unique_indices_size + 1; + size_t thread_num = 8; + if (slice_positions.size() < thread_num) { + thread_num = slice_positions.size(); + } + size_t stride = (slice_positions.size() + thread_num - 1) / thread_num; + thread_num = (slice_positions.size() + stride - 1) / stride; + std::vector threads; + size_t max_length = sorted_indices.size() * outer_dim; + for (size_t i = 0; i < thread_num; ++i) { + size_t slice_start = i * stride; + size_t slice_end = 0; + if (i == thread_num - 1) { + slice_end = slice_positions.size(); + } else { + slice_end = slice_start + stride; + } + WorkerParamsForReduceSparseGradient params{ + slice_start, slice_end, max_length, outer_dim, &sorted_indices, &slice_positions, origin_sparse_grad.value_, + unique_grad}; + threads.emplace_back(std::thread(WorkerForReduceSparseGradient, params)); + } + for (size_t i = 0; i < thread_num; ++i) { + threads[i].join(); + } + unique_grad->indices_size_ = slice_positions.size(); } std::pair GetKernelInput(const AnfNodePtr &anf_node, size_t index) {