forked from mindspore-Ecosystem/mindspore
parallel reduce sparse gradient
This commit is contained in:
parent
bd0c538470
commit
b867d6d60a
|
@ -327,19 +327,16 @@ bool AscendKernelRuntime::GenTask(const session::KernelGraph *graph) {
|
|||
vector<std::shared_ptr<TaskInfo>> task_info_list;
|
||||
auto anf_node_list = graph->execution_order();
|
||||
TaskGenerator::GenTasks(anf_node_list, &task_info_list, graph->graph_id());
|
||||
|
||||
// Store the task_info_list
|
||||
auto insert_ret = task_map_.insert(std::make_pair(graph->graph_id(), task_info_list));
|
||||
if (!insert_ret.second) {
|
||||
MS_LOG(EXCEPTION) << "Duplicate GraphId! Please check in ascend_session.";
|
||||
}
|
||||
|
||||
// Graph may have no compute node, such TensorAddGrad.
|
||||
if (task_info_list.empty()) {
|
||||
MS_LOG(WARNING) << "graph " << graph->graph_id() << " have no compute node";
|
||||
return true;
|
||||
}
|
||||
|
||||
AscendStreamAssign &assign_instance = AscendStreamAssign::GetInstance();
|
||||
AscendStreamMng &stream_manager = AscendStreamMng::GetInstance();
|
||||
AscendLabelAssign &label_assign_instance = AscendLabelAssign::GetInstance();
|
||||
|
@ -348,19 +345,16 @@ bool AscendKernelRuntime::GenTask(const session::KernelGraph *graph) {
|
|||
assign_instance.GetWaitStreams(&wait_active_stream_list);
|
||||
std::vector<uint32_t> force_copy_stream_list;
|
||||
assign_instance.GetHcomStreams(&force_copy_stream_list);
|
||||
|
||||
MS_LOG(INFO) << "call DavinciModel total stream num:" << stream_manager.GetCurAllocStreamNum()
|
||||
<< ", total event num:" << assign_instance.total_event_num()
|
||||
<< ", total label num:" << label_assign_instance.GetLabelNum(NOT_NULL(graph))
|
||||
<< ", wait_active_stream_list size:" << wait_active_stream_list.size()
|
||||
<< ", force_copy_stream_list size:" << force_copy_stream_list.size();
|
||||
|
||||
std::vector<std::shared_ptr<ge::model_runner::OpInfo>> empty_list;
|
||||
std::shared_ptr<ge::model_runner::DavinciModel> model = std::make_shared<ge::model_runner::DavinciModel>(
|
||||
task_info_list, empty_list, empty_list, empty_list, empty_list, wait_active_stream_list, force_copy_stream_list, 0,
|
||||
0, 0, 0, 0, 0, stream_manager.GetCurAllocStreamNum(), label_assign_instance.GetLabelNum(NOT_NULL(graph)),
|
||||
assign_instance.total_event_num(), 0);
|
||||
|
||||
auto ret = graph_model_map_.insert(std::make_pair(graph->graph_id(), model));
|
||||
if (!ret.second) {
|
||||
MS_LOG(EXCEPTION) << "Duplicate GraphId! Please check in ascend_session.";
|
||||
|
|
|
@ -147,20 +147,18 @@ BaseRef CPUKernelRuntime::CreatTensorForOutput(const session::KernelWithIndex &k
|
|||
auto &input_node = kernel_with_index.first;
|
||||
auto index = kernel_with_index.second;
|
||||
MS_EXCEPTION_IF_NULL(input_node);
|
||||
if (input_node->isa<CNode>() && AnfAlgo::GetCNodeName(input_node) == prim::kPrimMakeTuple->name()) {
|
||||
auto cnode = input_node->cast<CNodePtr>();
|
||||
MS_EXCEPTION_IF_NULL(cnode);
|
||||
VectorRef ret;
|
||||
for (size_t i = 1; i < cnode->inputs().size(); i++) {
|
||||
auto item_with_index = AnfAlgo::VisitKernelWithReturnType(cnode->input(i), 0);
|
||||
auto out = CreatTensorForOutput(item_with_index, input_map, bound_addresses, need_sync_outputs);
|
||||
ret.push_back(out);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
if (input_node->isa<CNode>()) {
|
||||
auto node = input_node->cast<CNodePtr>();
|
||||
MS_EXCEPTION_IF_NULL(node);
|
||||
if (AnfAlgo::GetCNodeName(input_node) == prim::kPrimMakeTuple->name()) {
|
||||
VectorRef ret;
|
||||
for (size_t i = 1; i < node->inputs().size(); i++) {
|
||||
auto item_with_index = AnfAlgo::VisitKernelWithReturnType(node->input(i), 0);
|
||||
auto out = CreatTensorForOutput(item_with_index, input_map, bound_addresses, need_sync_outputs);
|
||||
ret.push_back(out);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
size_t output_size = AnfAlgo::GetOutputTensorNum(node);
|
||||
if (index >= output_size) {
|
||||
MS_LOG(EXCEPTION) << "Invalid input index " << index;
|
||||
|
|
|
@ -577,6 +577,52 @@ void DeduplicateIndexedSlices(const SparseGradient &origin_sparse_grad, SparseGr
|
|||
unique_grad->indices_size_ = unique_indices_size;
|
||||
}
|
||||
|
||||
struct WorkerParamsForReduceSparseGradient {
|
||||
size_t slice_start_{0};
|
||||
size_t slice_end_{0};
|
||||
size_t max_length_{0};
|
||||
size_t outer_dim_{0};
|
||||
std::vector<std::pair<int, size_t>> *sorted_indices_{nullptr};
|
||||
std::vector<size_t> *slice_positions_{nullptr};
|
||||
float *src_value_{nullptr};
|
||||
SparseGradient *unique_grad_{nullptr};
|
||||
};
|
||||
|
||||
void WorkerForReduceSparseGradient(WorkerParamsForReduceSparseGradient param) {
|
||||
MS_EXCEPTION_IF_NULL(param.sorted_indices_);
|
||||
MS_EXCEPTION_IF_NULL(param.slice_positions_);
|
||||
MS_EXCEPTION_IF_NULL(param.src_value_);
|
||||
MS_EXCEPTION_IF_NULL(param.unique_grad_);
|
||||
auto outer_dim = param.outer_dim_;
|
||||
auto &sorted_indices = *(param.sorted_indices_);
|
||||
auto &slice_positions = *(param.slice_positions_);
|
||||
auto unique_grad = param.unique_grad_;
|
||||
for (size_t slice_id = param.slice_start_; slice_id < param.slice_end_; ++slice_id) {
|
||||
size_t cur_pos = slice_positions[slice_id];
|
||||
int index = sorted_indices[cur_pos].first;
|
||||
unique_grad->indices_[slice_id] = index;
|
||||
size_t start_index = slice_id * outer_dim;
|
||||
auto ret_code = memcpy_s(unique_grad->value_ + start_index, (param.max_length_ - start_index) * sizeof(float),
|
||||
param.src_value_ + sorted_indices[cur_pos].second, outer_dim * sizeof(float));
|
||||
if (ret_code != EOK) {
|
||||
MS_LOG(EXCEPTION) << "Failed to copy data!";
|
||||
}
|
||||
cur_pos++;
|
||||
size_t end_pos;
|
||||
if (slice_id + 1 < slice_positions.size()) {
|
||||
end_pos = slice_positions[slice_id + 1];
|
||||
} else {
|
||||
end_pos = sorted_indices.size();
|
||||
}
|
||||
while (cur_pos < end_pos) {
|
||||
for (size_t i = 0; i < outer_dim; ++i) {
|
||||
unique_grad->value_[start_index + i] += param.src_value_[sorted_indices[cur_pos].second + i];
|
||||
}
|
||||
cur_pos++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad, size_t first_dim,
|
||||
size_t outer_dim) {
|
||||
MS_EXCEPTION_IF_NULL(origin_sparse_grad.value_);
|
||||
|
@ -584,47 +630,50 @@ void ReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradie
|
|||
MS_EXCEPTION_IF_NULL(unique_grad);
|
||||
MS_EXCEPTION_IF_NULL(unique_grad->value_);
|
||||
MS_EXCEPTION_IF_NULL(unique_grad->indices_);
|
||||
size_t unique_indices_size = 0;
|
||||
std::vector<std::pair<int, size_t>> sorted_indices;
|
||||
sorted_indices.reserve(origin_sparse_grad.indices_size_);
|
||||
for (size_t i = 0; i < origin_sparse_grad.indices_size_; ++i) {
|
||||
int index = origin_sparse_grad.indices_[i];
|
||||
if (index < 0 || IntToSize(index) >= first_dim) {
|
||||
continue;
|
||||
if (index >= 0 && IntToSize(index) < first_dim) {
|
||||
sorted_indices.emplace_back(std::pair<int, size_t>(index, i * outer_dim));
|
||||
}
|
||||
sorted_indices.emplace_back(std::pair<int, size_t>(index, i * outer_dim));
|
||||
}
|
||||
std::sort(
|
||||
sorted_indices.begin(), sorted_indices.end(),
|
||||
[](const std::pair<int, size_t> &left, const std::pair<int, size_t> &right) { return left.first < right.first; });
|
||||
|
||||
int last_index = 0;
|
||||
size_t indices_size = sorted_indices.size();
|
||||
size_t start_index = 0;
|
||||
size_t end_index = outer_dim;
|
||||
size_t dst_len = indices_size * outer_dim;
|
||||
for (size_t i = 0; i < indices_size; ++i) {
|
||||
int index = sorted_indices[i].first;
|
||||
if (i == 0 || last_index != index) {
|
||||
if (i > 0 && last_index != index) {
|
||||
unique_indices_size++;
|
||||
start_index += outer_dim;
|
||||
end_index += outer_dim;
|
||||
}
|
||||
unique_grad->indices_[unique_indices_size] = index;
|
||||
auto ret_code = memcpy_s(unique_grad->value_ + start_index, dst_len - start_index,
|
||||
origin_sparse_grad.value_ + sorted_indices[i].second, outer_dim);
|
||||
if (ret_code != EOK) {
|
||||
MS_LOG(EXCEPTION) << "Failed to copy data!";
|
||||
}
|
||||
} else {
|
||||
for (size_t j = start_index, k = sorted_indices[i].second; j < end_index; ++j, ++k) {
|
||||
unique_grad->value_[j] += origin_sparse_grad.value_[k];
|
||||
}
|
||||
std::vector<size_t> slice_positions;
|
||||
for (size_t i = 0; i < sorted_indices.size(); ++i) {
|
||||
if (i == 0 || last_index != sorted_indices[i].first) {
|
||||
slice_positions.emplace_back(i);
|
||||
}
|
||||
last_index = index;
|
||||
last_index = sorted_indices[i].first;
|
||||
}
|
||||
unique_grad->indices_size_ = unique_indices_size + 1;
|
||||
size_t thread_num = 8;
|
||||
if (slice_positions.size() < thread_num) {
|
||||
thread_num = slice_positions.size();
|
||||
}
|
||||
size_t stride = (slice_positions.size() + thread_num - 1) / thread_num;
|
||||
thread_num = (slice_positions.size() + stride - 1) / stride;
|
||||
std::vector<std::thread> threads;
|
||||
size_t max_length = sorted_indices.size() * outer_dim;
|
||||
for (size_t i = 0; i < thread_num; ++i) {
|
||||
size_t slice_start = i * stride;
|
||||
size_t slice_end = 0;
|
||||
if (i == thread_num - 1) {
|
||||
slice_end = slice_positions.size();
|
||||
} else {
|
||||
slice_end = slice_start + stride;
|
||||
}
|
||||
WorkerParamsForReduceSparseGradient params{
|
||||
slice_start, slice_end, max_length, outer_dim, &sorted_indices, &slice_positions, origin_sparse_grad.value_,
|
||||
unique_grad};
|
||||
threads.emplace_back(std::thread(WorkerForReduceSparseGradient, params));
|
||||
}
|
||||
for (size_t i = 0; i < thread_num; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
unique_grad->indices_size_ = slice_positions.size();
|
||||
}
|
||||
|
||||
std::pair<AnfNodePtr, size_t> GetKernelInput(const AnfNodePtr &anf_node, size_t index) {
|
||||
|
|
Loading…
Reference in New Issue