From 891b80b9f580e0f942df84571eaf97ec47f0aa7c Mon Sep 17 00:00:00 2001 From: kswang Date: Thu, 16 Jul 2020 11:58:54 +0800 Subject: [PATCH] add bucket reduce sparse gradient --- .../backend/kernel_compiler/common_utils.cc | 511 +++++++++++------- .../backend/kernel_compiler/common_utils.h | 28 +- .../cpu/sparse_apply_adam_cpu_kernel.cc | 18 +- .../cpu/sparse_apply_ftrl_cpu_kernel.cc | 17 +- .../cpu/sparse_apply_lazy_adam_cpu_kernel.cc | 16 +- ...parse_apply_proximal_adagrad_cpu_kernel.cc | 16 +- .../ccsrc/backend/session/cpu_session.cc | 45 +- tests/ut/cpp/kernel/common_utils_test.cc | 67 ++- 8 files changed, 465 insertions(+), 253 deletions(-) diff --git a/mindspore/ccsrc/backend/kernel_compiler/common_utils.cc b/mindspore/ccsrc/backend/kernel_compiler/common_utils.cc index f4495cdb9df..ef2f75ee6e1 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/common_utils.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/common_utils.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include #include "nlohmann/json.hpp" #include "backend/session/anf_runtime_algorithm.h" @@ -499,235 +500,329 @@ int Sign(float x) { return 0; } -void DeduplicateIndexedSlices(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad, size_t first_dim, - size_t outer_dim) { - MS_EXCEPTION_IF_NULL(origin_sparse_grad.value_); - MS_EXCEPTION_IF_NULL(origin_sparse_grad.indices_); - MS_EXCEPTION_IF_NULL(unique_grad); - MS_EXCEPTION_IF_NULL(unique_grad->value_); - MS_EXCEPTION_IF_NULL(unique_grad->indices_); +namespace { +struct BucketSparseGradient { + float *value_; + int *indices_; + int *global_indices_; + size_t indices_size_; +}; + +struct MultiThreadReduceSparseGradientParam { + SparseGradient *input_grad_{nullptr}; + SparseGradient *workspace_grad_{nullptr}; + SparseGradient *output_grad_{nullptr}; + size_t max_index_{0}; + size_t value_stride_{0}; + size_t thread_num_{0}; + bool use_sort_reduce_{false}; +}; + +void CalculateEachBucketSize(const std::shared_ptr &sparse_grad, size_t max_index, + std::vector *each_bucket_size) { + MS_LOG(DEBUG) << "Start"; + MS_EXCEPTION_IF_NULL(sparse_grad); + MS_EXCEPTION_IF_NULL(sparse_grad->indices_); + MS_EXCEPTION_IF_NULL(each_bucket_size); + size_t bucket_num = each_bucket_size->size(); + for (size_t i = 0; i < sparse_grad->indices_size_; ++i) { + int index = sparse_grad->indices_[i]; + if (index >= 0 && IntToSize(index) < max_index) { + auto bucket_id = index % bucket_num; + each_bucket_size->at(bucket_id)++; + } + } + MS_LOG(DEBUG) << "End"; +} + +void SplitAndCalculateSegmentBucketSize(const MultiThreadReduceSparseGradientParam ¶m, + std::vector> *segments_ptr, + std::vector>> *segment_bucket_sizes_ptr) { + MS_EXCEPTION_IF_NULL(param.input_grad_); + MS_EXCEPTION_IF_NULL(segment_bucket_sizes_ptr); + MS_EXCEPTION_IF_NULL(segments_ptr); + auto &segments = *segments_ptr; + auto &segment_bucket_sizes = *segment_bucket_sizes_ptr; + auto input_grad = param.input_grad_; + if (param.thread_num_ < 1) { + MS_EXCEPTION(ArgumentError) << "Input param thread num must > 0!"; + } + size_t thread_indices_size = input_grad->indices_size_ / param.thread_num_; + size_t left_indices_size = input_grad->indices_size_ % param.thread_num_; + std::vector threads; + threads.reserve(param.thread_num_); + segments.reserve(param.thread_num_); + + size_t current_indices_offset = 0; + for (size_t i = 0; i < param.thread_num_; ++i) { + segment_bucket_sizes.emplace_back(std::make_shared>(param.thread_num_, 0)); + size_t indices_size = thread_indices_size; + if (i < left_indices_size) { + indices_size += 1; + } + segments.emplace_back(std::make_shared()); + segments[i]->value_ = input_grad->value_ + current_indices_offset * param.value_stride_; + segments[i]->indices_ = input_grad->indices_ + current_indices_offset; + segments[i]->indices_size_ = indices_size; + threads.emplace_back( + std::thread(CalculateEachBucketSize, segments[i], param.max_index_, segment_bucket_sizes[i].get())); + current_indices_offset += indices_size; + } + + for (size_t i = 0; i < param.thread_num_; ++i) { + threads[i].join(); + } +} + +void CopySegmentIndicesToBucket(const MultiThreadReduceSparseGradientParam ¶m, + const std::shared_ptr &segment, size_t bucket_offset, + const std::vector> &buckets) { + MS_LOG(DEBUG) << "Start"; + MS_EXCEPTION_IF_NULL(segment); + MS_EXCEPTION_IF_NULL(segment->indices_); + std::vector bucket_data_num(param.thread_num_, 0); + for (size_t i = 0; i < segment->indices_size_; ++i) { + int index = segment->indices_[i]; + if (index >= 0 && IntToSize(index) < param.max_index_) { + auto bucket_id = index % param.thread_num_; + auto bucket_index = bucket_data_num[bucket_id]; + buckets[bucket_id]->indices_[bucket_index] = index; + buckets[bucket_id]->global_indices_[bucket_index] = bucket_offset + i; + bucket_data_num[bucket_id]++; + } + } + MS_LOG(DEBUG) << "End"; +} + +void GatherSegmentIndicesToOutputBucket(const MultiThreadReduceSparseGradientParam ¶m, + const std::vector> &segments, + const std::vector>> &segment_bucket_sizes, + std::vector> *buckets_ptr) { + MS_EXCEPTION_IF_NULL(param.output_grad_); + MS_EXCEPTION_IF_NULL(param.output_grad_->value_); + MS_EXCEPTION_IF_NULL(param.output_grad_->indices_); + MS_EXCEPTION_IF_NULL(buckets_ptr); + auto &buckets = *buckets_ptr; + size_t thread_num = param.thread_num_; + if (thread_num != segment_bucket_sizes.size()) { + MS_EXCEPTION(ArgumentError) << "Input param thread num not equal to segment size!"; + } + std::vector bucket_data_size(thread_num, 0); + for (size_t i = 0; i < thread_num; ++i) { + for (size_t j = 0; j < thread_num; ++j) { + bucket_data_size[j] += segment_bucket_sizes[i]->at(j); + } + } + size_t current_indices_offset = 0; + for (size_t i = 0; i < thread_num; ++i) { + buckets.emplace_back(std::make_shared()); + buckets[i]->value_ = param.output_grad_->value_ + current_indices_offset * param.value_stride_; + buckets[i]->indices_ = param.output_grad_->indices_ + current_indices_offset; + buckets[i]->global_indices_ = param.workspace_grad_->indices_ + current_indices_offset; + buckets[i]->indices_size_ = bucket_data_size[i]; + current_indices_offset += bucket_data_size[i]; + } + std::vector tmp_bucket_data_size(thread_num, 0); + std::vector>> each_thread_buckets; + for (size_t i = 0; i < thread_num; ++i) { + std::vector> thread_buckets; + for (size_t j = 0; j < thread_num; ++j) { + thread_buckets.emplace_back(std::make_shared()); + thread_buckets[j]->indices_ = buckets[j]->indices_ + tmp_bucket_data_size[j]; + thread_buckets[j]->global_indices_ = buckets[j]->global_indices_ + tmp_bucket_data_size[j]; + thread_buckets[j]->value_ = buckets[j]->value_ + tmp_bucket_data_size[j] * param.value_stride_; + thread_buckets[j]->indices_size_ = segment_bucket_sizes[i]->at(j); + tmp_bucket_data_size[j] += segment_bucket_sizes[i]->at(j); + } + each_thread_buckets.emplace_back(thread_buckets); + } + std::vector threads; + threads.reserve(thread_num); + current_indices_offset = 0; + for (size_t i = 0; i < thread_num; ++i) { + threads.emplace_back( + std::thread(CopySegmentIndicesToBucket, param, segments[i], current_indices_offset, each_thread_buckets[i])); + current_indices_offset += segments[i]->indices_size_; + } + for (size_t i = 0; i < thread_num; ++i) { + threads[i].join(); + } +} + +void SortAndReduceBucketSparseGradient(const MultiThreadReduceSparseGradientParam ¶m, + const std::shared_ptr &bucket, + const std::shared_ptr &reduced_bucket) { + MS_LOG(DEBUG) << "Start"; + MS_EXCEPTION_IF_NULL(bucket); + MS_EXCEPTION_IF_NULL(bucket->value_); + MS_EXCEPTION_IF_NULL(bucket->indices_); + MS_EXCEPTION_IF_NULL(reduced_bucket); + MS_EXCEPTION_IF_NULL(reduced_bucket->value_); + MS_EXCEPTION_IF_NULL(reduced_bucket->indices_); + std::vector> sorted_indices; + sorted_indices.reserve(bucket->indices_size_); + for (size_t i = 0; i < bucket->indices_size_; ++i) { + int index = bucket->indices_[i]; + int global_index = bucket->global_indices_[i]; + sorted_indices.emplace_back(std::pair(index, global_index)); + } + std::sort(sorted_indices.begin(), sorted_indices.end()); + + float *global_value = param.input_grad_->value_; + size_t unique_indices_size = 0; + size_t max_length = reduced_bucket->indices_size_ * param.value_stride_; + int last_index{0}; + size_t value_offset{0}; + for (size_t i = 0; i < sorted_indices.size(); ++i) { + int index = sorted_indices[i].first; + int global_index = sorted_indices[i].second; + int global_value_offset = global_index * param.value_stride_; + if (i == 0 || index != last_index) { + if (i != 0) { + unique_indices_size++; + } + reduced_bucket->indices_[unique_indices_size] = index; + value_offset = unique_indices_size * param.value_stride_; + auto ret_code = memcpy_s(reduced_bucket->value_ + value_offset, (max_length - value_offset) * sizeof(float), + global_value + global_value_offset, param.value_stride_ * sizeof(float)); + if (ret_code != EOK) { + MS_LOG(EXCEPTION) << "Failed to copy data!"; + } + } else { + for (size_t j = 0; j < param.value_stride_; ++j) { + reduced_bucket->value_[value_offset + j] += global_value[global_value_offset + j]; + } + } + last_index = index; + } + reduced_bucket->indices_size_ = unique_indices_size; + MS_LOG(DEBUG) << "End"; +} + +void ReduceBucketSparseGradient(const MultiThreadReduceSparseGradientParam ¶m, + const std::shared_ptr &bucket, + const std::shared_ptr &reduced_bucket) { + MS_LOG(DEBUG) << "Start"; + MS_EXCEPTION_IF_NULL(bucket); + MS_EXCEPTION_IF_NULL(bucket->value_); + MS_EXCEPTION_IF_NULL(bucket->indices_); + MS_EXCEPTION_IF_NULL(reduced_bucket); + MS_EXCEPTION_IF_NULL(reduced_bucket->value_); + MS_EXCEPTION_IF_NULL(reduced_bucket->indices_); + + float *global_value = param.input_grad_->value_; std::unordered_map index_map; size_t unique_indices_size = 0; - for (size_t i = 0; i < origin_sparse_grad.indices_size_; ++i) { - int index = origin_sparse_grad.indices_[i]; - if (index < 0 || IntToSize(index) >= first_dim) { - continue; - } + size_t max_length = reduced_bucket->indices_size_ * param.value_stride_; + for (size_t i = 0; i < bucket->indices_size_; ++i) { + int index = bucket->indices_[i]; + int global_index = bucket->global_indices_[i]; auto iter = index_map.find(index); if (iter == index_map.end()) { - index_map[index] = unique_indices_size; - unique_grad->indices_[unique_indices_size] = index; - size_t start_index = unique_indices_size * outer_dim; - size_t end_index = start_index + outer_dim; - for (size_t j = start_index, k = i * outer_dim; j < end_index; ++j, ++k) { - unique_grad->value_[j] = origin_sparse_grad.value_[k]; + reduced_bucket->indices_[unique_indices_size] = index; + size_t start_index = unique_indices_size * param.value_stride_; + index_map[index] = start_index; + auto ret_code = memcpy_s(reduced_bucket->value_ + start_index, (max_length - start_index) * sizeof(float), + global_value + global_index * param.value_stride_, param.value_stride_ * sizeof(float)); + if (ret_code != EOK) { + MS_LOG(EXCEPTION) << "Failed to copy data!"; } unique_indices_size++; } else { - size_t first_index = iter->second; - size_t start_index = first_index * outer_dim; - size_t end_index = start_index + outer_dim; - for (size_t j = start_index, k = i * outer_dim; j < end_index; ++j, ++k) { - unique_grad->value_[j] += origin_sparse_grad.value_[k]; + size_t start_index = iter->second; + size_t end_index = start_index + param.value_stride_; + for (size_t j = start_index, k = global_index * param.value_stride_; j < end_index; ++j, ++k) { + reduced_bucket->value_[j] += global_value[k]; } } } - unique_grad->indices_size_ = unique_indices_size; -} - -struct WorkerParamsForReduceSparseGradient { - size_t slice_start_{0}; - size_t slice_end_{0}; - size_t max_length_{0}; - size_t outer_dim_{0}; - std::vector> *sorted_indices_{nullptr}; - std::vector *slice_positions_{nullptr}; - float *src_value_{nullptr}; - SparseGradient *unique_grad_{nullptr}; -}; - -void WorkerForReduceSparseGradient(WorkerParamsForReduceSparseGradient param) { - MS_EXCEPTION_IF_NULL(param.sorted_indices_); - MS_EXCEPTION_IF_NULL(param.slice_positions_); - MS_EXCEPTION_IF_NULL(param.src_value_); - MS_EXCEPTION_IF_NULL(param.unique_grad_); - auto outer_dim = param.outer_dim_; - auto &sorted_indices = *(param.sorted_indices_); - auto &slice_positions = *(param.slice_positions_); - auto unique_grad = param.unique_grad_; - for (size_t slice_id = param.slice_start_; slice_id < param.slice_end_; ++slice_id) { - size_t cur_pos = slice_positions[slice_id]; - int index = sorted_indices[cur_pos].first; - unique_grad->indices_[slice_id] = index; - size_t start_index = slice_id * outer_dim; - auto ret_code = memcpy_s(unique_grad->value_ + start_index, (param.max_length_ - start_index) * sizeof(float), - param.src_value_ + sorted_indices[cur_pos].second, outer_dim * sizeof(float)); - if (ret_code != EOK) { - MS_LOG(EXCEPTION) << "Failed to copy data!"; - } - cur_pos++; - size_t end_pos; - if (slice_id + 1 < slice_positions.size()) { - end_pos = slice_positions[slice_id + 1]; - } else { - end_pos = sorted_indices.size(); - } - while (cur_pos < end_pos) { - for (size_t i = 0; i < outer_dim; ++i) { - unique_grad->value_[start_index + i] += param.src_value_[sorted_indices[cur_pos].second + i]; - } - cur_pos++; - } - } -} - -void RunMultiThreadReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad, - size_t outer_dim, std::vector> *sorted_indices, - std::vector *slice_positions) { - MS_LOG(DEBUG) << "Start"; - size_t thread_num = 24; - if (slice_positions->size() < thread_num) { - thread_num = slice_positions->size(); - } - size_t stride = (slice_positions->size() + thread_num - 1) / thread_num; - thread_num = (slice_positions->size() + stride - 1) / stride; - std::vector threads; - size_t max_length = sorted_indices->size() * outer_dim; - for (size_t i = 0; i < thread_num; ++i) { - size_t slice_start = i * stride; - size_t slice_end = 0; - if (i == thread_num - 1) { - slice_end = slice_positions->size(); - } else { - slice_end = slice_start + stride; - } - WorkerParamsForReduceSparseGradient params{ - slice_start, slice_end, max_length, outer_dim, sorted_indices, slice_positions, origin_sparse_grad.value_, - unique_grad}; - threads.emplace_back(std::thread(WorkerForReduceSparseGradient, params)); - } - for (size_t i = 0; i < thread_num; ++i) { - threads[i].join(); - } + reduced_bucket->indices_size_ = unique_indices_size; MS_LOG(DEBUG) << "End"; } -void ReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad, size_t first_dim, - size_t outer_dim, bool use_multi_threads) { - MS_LOG(DEBUG) << "Start"; - MS_EXCEPTION_IF_NULL(origin_sparse_grad.value_); - MS_EXCEPTION_IF_NULL(origin_sparse_grad.indices_); - MS_EXCEPTION_IF_NULL(unique_grad); - MS_EXCEPTION_IF_NULL(unique_grad->value_); - MS_EXCEPTION_IF_NULL(unique_grad->indices_); - std::vector> sorted_indices; - sorted_indices.reserve(origin_sparse_grad.indices_size_); - for (size_t i = 0; i < origin_sparse_grad.indices_size_; ++i) { - int index = origin_sparse_grad.indices_[i]; - if (index >= 0 && IntToSize(index) < first_dim) { - sorted_indices.emplace_back(std::pair(index, i * outer_dim)); - } - } - std::sort( - sorted_indices.begin(), sorted_indices.end(), - [](const std::pair &left, const std::pair &right) { return left.first < right.first; }); - int last_index = 0; - std::vector slice_positions; - slice_positions.reserve(sorted_indices.size()); - for (size_t i = 0; i < sorted_indices.size(); ++i) { - if (i == 0 || last_index != sorted_indices[i].first) { - slice_positions.emplace_back(i); - } - last_index = sorted_indices[i].first; - } - if (use_multi_threads) { - RunMultiThreadReduceSparseGradient(origin_sparse_grad, unique_grad, outer_dim, &sorted_indices, &slice_positions); - } else { - size_t max_length = sorted_indices.size() * outer_dim; - WorkerParamsForReduceSparseGradient params{0, - slice_positions.size(), - max_length, - outer_dim, - &sorted_indices, - &slice_positions, - origin_sparse_grad.value_, - unique_grad}; - WorkerForReduceSparseGradient(params); - } - unique_grad->indices_size_ = slice_positions.size(); - MS_LOG(DEBUG) << "End"; -} - -void ReduceMultiSparseGradient(const std::vector> &unique_slice_grads, - SparseGradient *tmp_grad, SparseGradient *unique_grad, size_t first_dim, - size_t outer_dim) { - MS_LOG(DEBUG) << "Start"; - if (unique_slice_grads.empty()) { - return; - } - size_t index_data_size = outer_dim * sizeof(float); - size_t unique_indices_size = 0; - for (size_t i = 0; i < unique_slice_grads.size(); ++i) { - auto &slice_grad = unique_slice_grads[i]; - auto ret_code = memcpy_s(tmp_grad->value_ + unique_indices_size * outer_dim, - (tmp_grad->indices_size_ - unique_indices_size) * index_data_size, slice_grad->value_, - slice_grad->indices_size_ * index_data_size); - if (ret_code != EOK) { - MS_LOG(EXCEPTION) << "Failed to copy data!"; - } - ret_code = - memcpy_s(tmp_grad->indices_ + unique_indices_size, (tmp_grad->indices_size_ - unique_indices_size) * sizeof(int), - slice_grad->indices_, slice_grad->indices_size_ * sizeof(int)); - if (ret_code != EOK) { - MS_LOG(EXCEPTION) << "Failed to copy data!"; - } - unique_indices_size += slice_grad->indices_size_; - } - tmp_grad->indices_size_ = unique_indices_size; - ReduceSparseGradient(*tmp_grad, unique_grad, first_dim, outer_dim); - MS_LOG(DEBUG) << "End"; -} - -void TwoLevelReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *tmp_grad, - SparseGradient *unique_grad, size_t first_dim, size_t outer_dim) { - MS_LOG(DEBUG) << "Start"; - MS_EXCEPTION_IF_NULL(origin_sparse_grad.value_); - MS_EXCEPTION_IF_NULL(origin_sparse_grad.indices_); - MS_EXCEPTION_IF_NULL(unique_grad); - MS_EXCEPTION_IF_NULL(unique_grad->value_); - MS_EXCEPTION_IF_NULL(unique_grad->indices_); - MS_EXCEPTION_IF_NULL(tmp_grad); - MS_EXCEPTION_IF_NULL(tmp_grad->value_); - MS_EXCEPTION_IF_NULL(tmp_grad->indices_); - size_t thread_num = 24; - if (origin_sparse_grad.indices_size_ < thread_num) { - thread_num = origin_sparse_grad.indices_size_; - } - size_t thread_indices_size = origin_sparse_grad.indices_size_ / thread_num; - size_t left_indices_size = origin_sparse_grad.indices_size_ % thread_num; +void ReduceBucketSparseGradientToWorkspace(const MultiThreadReduceSparseGradientParam ¶m, + const std::vector> &buckets, + std::vector> *reduced_buckets_ptr) { + MS_EXCEPTION_IF_NULL(param.workspace_grad_); + MS_EXCEPTION_IF_NULL(param.workspace_grad_->value_); + MS_EXCEPTION_IF_NULL(param.workspace_grad_->indices_); + MS_EXCEPTION_IF_NULL(reduced_buckets_ptr); + auto &reduced_buckets = *reduced_buckets_ptr; + size_t thread_num = buckets.size(); std::vector threads; threads.reserve(thread_num); - std::vector> unique_slice_grads; + + size_t current_indices_offset = 0; for (size_t i = 0; i < thread_num; ++i) { - size_t indices_size = thread_indices_size; - if (i == thread_num - 1) { - indices_size = thread_indices_size + left_indices_size; + reduced_buckets.emplace_back(std::make_shared()); + reduced_buckets[i]->value_ = param.workspace_grad_->value_ + current_indices_offset * param.value_stride_; + reduced_buckets[i]->indices_ = param.workspace_grad_->indices_ + current_indices_offset; + reduced_buckets[i]->indices_size_ = buckets[i]->indices_size_; + if (param.use_sort_reduce_) { + threads.emplace_back(std::thread(SortAndReduceBucketSparseGradient, param, buckets[i], reduced_buckets[i])); + } else { + threads.emplace_back(std::thread(ReduceBucketSparseGradient, param, buckets[i], reduced_buckets[i])); } - size_t value_offset = i * thread_indices_size * outer_dim; - size_t indices_offset = i * thread_indices_size; - auto slice_grad = SparseGradient( - {origin_sparse_grad.value_ + value_offset, origin_sparse_grad.indices_ + indices_offset, indices_size}); - unique_slice_grads.emplace_back(std::make_shared()); - unique_slice_grads[i]->value_ = unique_grad->value_ + value_offset; - unique_slice_grads[i]->indices_ = unique_grad->indices_ + indices_offset; - unique_slice_grads[i]->indices_size_ = indices_size; - threads.emplace_back( - std::thread(ReduceSparseGradient, slice_grad, unique_slice_grads[i].get(), first_dim, outer_dim, false)); + current_indices_offset += buckets[i]->indices_size_; } for (size_t i = 0; i < thread_num; ++i) { threads[i].join(); } - ReduceMultiSparseGradient(unique_slice_grads, tmp_grad, unique_grad, first_dim, outer_dim); +} + +void MergeReduceSparseGradient(const MultiThreadReduceSparseGradientParam ¶m, + const std::vector> &reduced_buckets) { + MS_EXCEPTION_IF_NULL(param.output_grad_); + auto output_grad = param.output_grad_; + MS_EXCEPTION_IF_NULL(output_grad->value_); + MS_EXCEPTION_IF_NULL(output_grad->indices_); + size_t stride_data_size = param.value_stride_ * sizeof(float); + size_t unique_indices_size = 0; + for (size_t i = 0; i < reduced_buckets.size(); ++i) { + auto &bucket = reduced_buckets[i]; + MS_EXCEPTION_IF_NULL(bucket); + if (bucket->indices_size_ == 0) { + continue; + } + auto ret_code = memcpy_s(output_grad->value_ + unique_indices_size * param.value_stride_, + (output_grad->indices_size_ - unique_indices_size) * stride_data_size, bucket->value_, + bucket->indices_size_ * stride_data_size); + if (ret_code != EOK) { + MS_LOG(EXCEPTION) << "Failed to copy data!"; + } + ret_code = memcpy_s(output_grad->indices_ + unique_indices_size, + (output_grad->indices_size_ - unique_indices_size) * sizeof(int), bucket->indices_, + bucket->indices_size_ * sizeof(int)); + if (ret_code != EOK) { + MS_LOG(EXCEPTION) << "Failed to copy data!"; + } + unique_indices_size += bucket->indices_size_; + } + output_grad->indices_size_ = unique_indices_size; +} +} // namespace + +void BucketReduceSparseGradient(const ReduceSparseGradientParam ¶m) { + MS_LOG(DEBUG) << "Start"; + MS_EXCEPTION_IF_NULL(param.input_grad_); + size_t thread_num = 23; + if (param.input_grad_->indices_size_ < thread_num) { + thread_num = param.input_grad_->indices_size_; + } + MultiThreadReduceSparseGradientParam multi_thread_param({param.input_grad_, param.workspace_grad_, param.output_grad_, + param.max_index_, param.value_stride_, thread_num, + param.use_sort_reduce_}); + std::vector> segments; + std::vector>> segment_bucket_sizes; + SplitAndCalculateSegmentBucketSize(multi_thread_param, &segments, &segment_bucket_sizes); + + std::vector> buckets; + GatherSegmentIndicesToOutputBucket(multi_thread_param, segments, segment_bucket_sizes, &buckets); + + std::vector> reduced_buckets; + ReduceBucketSparseGradientToWorkspace(multi_thread_param, buckets, &reduced_buckets); + + MergeReduceSparseGradient(multi_thread_param, reduced_buckets); MS_LOG(DEBUG) << "End"; } diff --git a/mindspore/ccsrc/backend/kernel_compiler/common_utils.h b/mindspore/ccsrc/backend/kernel_compiler/common_utils.h index 8c9ea84b34e..4f48d70b2cc 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/common_utils.h +++ b/mindspore/ccsrc/backend/kernel_compiler/common_utils.h @@ -73,9 +73,18 @@ class KernelMeta { }; struct SparseGradient { - float *value_; - int *indices_; - size_t indices_size_; + float *value_{nullptr}; + int *indices_{nullptr}; + size_t indices_size_{0}; +}; + +struct ReduceSparseGradientParam { + SparseGradient *input_grad_{nullptr}; + SparseGradient *workspace_grad_{nullptr}; + SparseGradient *output_grad_{nullptr}; + size_t max_index_{0}; + size_t value_stride_{0}; + bool use_sort_reduce_{false}; }; struct MultiThreadComputeParams { @@ -112,10 +121,6 @@ void SaveJsonInfo(const std::string &json_name, const std::string &info); std::string GetProcessor(const AnfNodePtr &anf_node); bool IsSameShape(const std::vector &shape_a, const std::vector &shape_b); int Sign(float x); -void DeduplicateIndexedSlices(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad, size_t first_dim, - size_t outer_dim); -void ReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad, size_t first_dim, - size_t outer_dim, bool use_multi_threads = true); std::pair GetKernelInput(const AnfNodePtr &anf_node, size_t index); std::vector>> GetInputIndex(const std::vector &node_list, const std::vector &input_list); @@ -130,14 +135,7 @@ void GetGraphRealOutput(const FuncGraphPtr &func_graph, std::vector> *sorted_indices, - std::vector *slice_positions); -void ReduceMultiSparseGradient(const std::vector> &unique_slice_grads, - SparseGradient *tmp_grad, SparseGradient *unique_grad, size_t first_dim, - size_t outer_dim); -void TwoLevelReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *tmp_grad, - SparseGradient *unique_grad, size_t first_dim, size_t outer_dim); +void BucketReduceSparseGradient(const ReduceSparseGradientParam ¶m); std::vector GetReduceAttrAxis(const CNodePtr &cnode); } // namespace kernel } // namespace mindspore diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_adam_cpu_kernel.cc b/mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_adam_cpu_kernel.cc index 2ff8e77fcd1..9a247611e7f 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_adam_cpu_kernel.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_adam_cpu_kernel.cc @@ -81,6 +81,8 @@ void SparseApplyAdamCPUKernel::InitInputOutputSize(const CNodePtr &kernel_node) MS_EXCEPTION_IF_NULL(kernel_node); workspace_size_list_.emplace_back(indices_size_ * var_outer_dim_size_ * sizeof(float)); workspace_size_list_.emplace_back(indices_size_ * sizeof(int)); + workspace_size_list_.emplace_back(indices_size_ * var_outer_dim_size_ * sizeof(float)); + workspace_size_list_.emplace_back(indices_size_ * sizeof(int)); workspace_size_list_.emplace_back(var_first_dim_size_ * var_outer_dim_size_ * sizeof(float)); } @@ -142,11 +144,21 @@ bool SparseApplyAdamCPUKernel::Launch(const std::vector &inp auto indices = reinterpret_cast(inputs[10]->addr); auto new_grad = reinterpret_cast(workspace[0]->addr); auto new_indices = reinterpret_cast(workspace[1]->addr); - auto m_t = reinterpret_cast(workspace[2]->addr); + auto workspace_grad = reinterpret_cast(workspace[2]->addr); + auto workspace_indices = reinterpret_cast(workspace[3]->addr); + auto m_t = reinterpret_cast(workspace[4]->addr); SparseGradient unique_sparse_grad({new_grad, new_indices, indices_size_}); - ReduceSparseGradient(SparseGradient({grad, indices, indices_size_}), &unique_sparse_grad, var_first_dim_size_, - var_outer_dim_size_); + SparseGradient workspace_sparse_grad({workspace_grad, workspace_indices, indices_size_}); + SparseGradient input_sparse_grad({grad, indices, indices_size_}); + ReduceSparseGradientParam param; + param.input_grad_ = &input_sparse_grad; + param.workspace_grad_ = &workspace_sparse_grad; + param.output_grad_ = &unique_sparse_grad; + param.max_index_ = var_first_dim_size_; + param.value_stride_ = var_outer_dim_size_; + BucketReduceSparseGradient(param); + size_t total_dim_size = var_first_dim_size_ * var_outer_dim_size_; lr = lr * std::sqrt(1 - beta2_power) / (1 - beta1_power); diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_ftrl_cpu_kernel.cc b/mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_ftrl_cpu_kernel.cc index 2662604e196..1a1405f3b39 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_ftrl_cpu_kernel.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_ftrl_cpu_kernel.cc @@ -132,12 +132,19 @@ bool SparseApplyFtrlCPUKernel::Launch(const std::vector &inp auto indices = reinterpret_cast(inputs[4]->addr); auto new_grad = reinterpret_cast(workspace[0]->addr); auto new_indices = reinterpret_cast(workspace[1]->addr); - auto tmp_grad = reinterpret_cast(workspace[2]->addr); - auto tmp_indices = reinterpret_cast(workspace[3]->addr); + auto workspace_grad = reinterpret_cast(workspace[2]->addr); + auto workspace_indices = reinterpret_cast(workspace[3]->addr); + SparseGradient unique_sparse_grad({new_grad, new_indices, indices_size_}); - SparseGradient tmp_sparse_grad({tmp_grad, tmp_indices, indices_size_}); - TwoLevelReduceSparseGradient(SparseGradient({grad, indices, indices_size_}), &tmp_sparse_grad, &unique_sparse_grad, - var_first_dim_size_, var_outer_dim_size_); + SparseGradient workspace_sparse_grad({workspace_grad, workspace_indices, indices_size_}); + SparseGradient input_sparse_grad({grad, indices, indices_size_}); + ReduceSparseGradientParam param; + param.input_grad_ = &input_sparse_grad; + param.workspace_grad_ = &workspace_sparse_grad; + param.output_grad_ = &unique_sparse_grad; + param.max_index_ = var_first_dim_size_; + param.value_stride_ = var_outer_dim_size_; + BucketReduceSparseGradient(param); MultiThreadComputeParams input_params; input_params.var_ = var; diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_lazy_adam_cpu_kernel.cc b/mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_lazy_adam_cpu_kernel.cc index 636d92dcbb1..a19b014829b 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_lazy_adam_cpu_kernel.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_lazy_adam_cpu_kernel.cc @@ -123,13 +123,19 @@ bool SparseApplyLazyAdamCPUKernel::Launch(const std::vector auto indices = reinterpret_cast(inputs[10]->addr); auto new_grad = reinterpret_cast(workspace[0]->addr); auto new_indices = reinterpret_cast(workspace[1]->addr); - auto tmp_grad = reinterpret_cast(workspace[2]->addr); - auto tmp_indices = reinterpret_cast(workspace[3]->addr); + auto workspace_grad = reinterpret_cast(workspace[2]->addr); + auto workspace_indices = reinterpret_cast(workspace[3]->addr); SparseGradient unique_sparse_grad({new_grad, new_indices, indices_size_}); - SparseGradient tmp_sparse_grad({tmp_grad, tmp_indices, indices_size_}); - TwoLevelReduceSparseGradient(SparseGradient({grad, indices, indices_size_}), &tmp_sparse_grad, &unique_sparse_grad, - var_first_dim_size_, var_outer_dim_size_); + SparseGradient workspace_sparse_grad({workspace_grad, workspace_indices, indices_size_}); + SparseGradient input_sparse_grad({grad, indices, indices_size_}); + ReduceSparseGradientParam param; + param.input_grad_ = &input_sparse_grad; + param.workspace_grad_ = &workspace_sparse_grad; + param.output_grad_ = &unique_sparse_grad; + param.max_index_ = var_first_dim_size_; + param.value_stride_ = var_outer_dim_size_; + BucketReduceSparseGradient(param); lr = lr * std::sqrt(1 - beta2_power) / (1 - beta1_power); MultiThreadComputeParams input_params; diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_proximal_adagrad_cpu_kernel.cc b/mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_proximal_adagrad_cpu_kernel.cc index efba35ad8c0..46fa07c3793 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_proximal_adagrad_cpu_kernel.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_proximal_adagrad_cpu_kernel.cc @@ -61,6 +61,8 @@ void SparseApplyProximalAdagradCPUKernel::InitInputOutputSize(const CNodePtr &ke MS_EXCEPTION_IF_NULL(kernel_node); workspace_size_list_.emplace_back(indices_size_ * var_outer_dim_size_ * sizeof(float)); workspace_size_list_.emplace_back(indices_size_ * sizeof(int)); + workspace_size_list_.emplace_back(indices_size_ * var_outer_dim_size_ * sizeof(float)); + workspace_size_list_.emplace_back(indices_size_ * sizeof(int)); } void SparseApplyProximalAdagradCPUKernel::InitKernel(const CNodePtr &kernel_node) { @@ -119,9 +121,19 @@ bool SparseApplyProximalAdagradCPUKernel::Launch(const std::vector(inputs[6]->addr); auto new_grad = reinterpret_cast(workspace[0]->addr); auto new_indices = reinterpret_cast(workspace[1]->addr); + auto workspace_grad = reinterpret_cast(workspace[2]->addr); + auto workspace_indices = reinterpret_cast(workspace[3]->addr); + SparseGradient unique_sparse_grad({new_grad, new_indices, indices_size_}); - ReduceSparseGradient(SparseGradient({grad, indices, indices_size_}), &unique_sparse_grad, var_first_dim_size_, - var_outer_dim_size_); + SparseGradient workspace_sparse_grad({workspace_grad, workspace_indices, indices_size_}); + SparseGradient input_sparse_grad({grad, indices, indices_size_}); + ReduceSparseGradientParam param; + param.input_grad_ = &input_sparse_grad; + param.workspace_grad_ = &workspace_sparse_grad; + param.output_grad_ = &unique_sparse_grad; + param.max_index_ = var_first_dim_size_; + param.value_stride_ = var_outer_dim_size_; + BucketReduceSparseGradient(param); MultiThreadComputeParams input_params; input_params.var_ = var; diff --git a/mindspore/ccsrc/backend/session/cpu_session.cc b/mindspore/ccsrc/backend/session/cpu_session.cc index ca1c78d2066..b29b1cb1fb2 100644 --- a/mindspore/ccsrc/backend/session/cpu_session.cc +++ b/mindspore/ccsrc/backend/session/cpu_session.cc @@ -16,6 +16,7 @@ #include "backend/session/cpu_session.h" #include +#include #include "ir/tensor.h" #include "ir/anf.h" #include "backend/kernel_compiler/kernel.h" @@ -119,6 +120,48 @@ void CPUSession::SetKernelInfo(const KernelGraph *kernel_graph) { } } +namespace { +void KernelNotSupportException(const AnfNodePtr &kernel_node) { + std::string kernel_name = AnfAlgo::GetCNodeName(kernel_node); + std::stringstream operator_info; + operator_info << "Operator[" << kernel_name << "] "; + auto kernel_info = dynamic_cast(kernel_node->kernel_info()); + if (kernel_info == nullptr) { + operator_info << "is not support."; + MS_LOG(EXCEPTION) << operator_info.str(); + } + auto kernel_build_Info = kernel_info->select_kernel_build_info(); + if (kernel_build_Info == nullptr) { + operator_info << "is not support."; + MS_LOG(EXCEPTION) << operator_info.str(); + } + size_t input_num = kernel_build_Info->GetInputNum(); + if (input_num > 0) { + operator_info << " input("; + for (size_t i = 0; i < input_num; ++i) { + operator_info << TypeIdLabel(kernel_build_Info->GetInputDeviceType(i)); + if (i != input_num - 1) { + operator_info << ","; + } + } + operator_info << ") "; + } + size_t output_num = kernel_build_Info->GetOutputNum(); + if (output_num > 0) { + operator_info << "output("; + for (size_t i = 0; i < output_num; ++i) { + operator_info << TypeIdLabel(kernel_build_Info->GetOutputDeviceType(i)); + if (i != kernel_build_Info->GetOutputNum() - 1) { + operator_info << ","; + } + } + operator_info << ") "; + } + operator_info << "is not support."; + MS_LOG(EXCEPTION) << operator_info.str(); +} +} // namespace + void CPUSession::BuildKernel(const KernelGraph *kernel_graph) { MS_EXCEPTION_IF_NULL(kernel_graph); auto &kernel_nodes = kernel_graph->execution_order(); @@ -129,7 +172,7 @@ void CPUSession::BuildKernel(const KernelGraph *kernel_graph) { std::shared_ptr cpu_kernel = kernel::CPUKernelFactory::GetInstance().Create(kernel_name, kernel_node); if (cpu_kernel == nullptr) { - MS_LOG(EXCEPTION) << "Operator[" << kernel_name << "] is not support."; + KernelNotSupportException(kernel_node); } cpu_kernel->Init(kernel_node); AnfAlgo::SetKernelMod(cpu_kernel, kernel_node.get()); diff --git a/tests/ut/cpp/kernel/common_utils_test.cc b/tests/ut/cpp/kernel/common_utils_test.cc index 83f7c59e523..4e016cd4953 100644 --- a/tests/ut/cpp/kernel/common_utils_test.cc +++ b/tests/ut/cpp/kernel/common_utils_test.cc @@ -25,7 +25,7 @@ class CommonUtilTest : public UT::Common { CommonUtilTest() = default; }; -TEST_F(CommonUtilTest, DeduplicateIndexedSlicesTest1) { +TEST_F(CommonUtilTest, BucketReduceSparseGradient1) { // The indices is a vector and the grad is a tensor with shape (6, 2) /* 0 * 0 @@ -46,20 +46,39 @@ TEST_F(CommonUtilTest, DeduplicateIndexedSlicesTest1) { for (int i = 0; i < 6 * 2; i++) { grad.push_back(i); } - std::vector unique_indices(3); - std::vector summed_grad(6); - SparseGradient unique_grad({summed_grad.data(), unique_indices.data(), 0}); - ReduceSparseGradient(SparseGradient({grad.data(), indices.data(), 6}), &unique_grad, 6, 2); + std::vector unique_indices(6); + std::vector summed_grad(12); + std::vector tmp_indices(6); + std::vector tmp_grad(12); + + SparseGradient unique_grad({summed_grad.data(), unique_indices.data(), 6}); + SparseGradient workspace_grad({tmp_grad.data(), tmp_indices.data(), 6}); + SparseGradient input_grad({grad.data(), indices.data(), 6}); + + ReduceSparseGradientParam param; + param.input_grad_ = &input_grad; + param.workspace_grad_ = &workspace_grad; + param.output_grad_ = &unique_grad; + param.max_index_ = 6; + param.value_stride_ = 2; + BucketReduceSparseGradient(param); + EXPECT_EQ(unique_grad.indices_size_, 3); - EXPECT_EQ(unique_indices, std::vector({0, 1, 3})); + std::vector expect_indices({0, 1, 3}); + for (size_t i = 0; i < unique_grad.indices_size_; ++i) { + EXPECT_EQ(unique_grad.indices_[i], expect_indices[i]); + } /* 10 13 * 10 12 * 10 11 */ - EXPECT_EQ(summed_grad, std::vector({10, 13, 10, 12, 10, 11})); + std::vector expect_value({10, 13, 10, 12, 10, 11}); + for (size_t i = 0; i < unique_grad.indices_size_ * 2; ++i) { + EXPECT_EQ(unique_grad.value_[i], expect_value[i]); + } } -TEST_F(CommonUtilTest, DeduplicateIndexedSlicesTest2) { +TEST_F(CommonUtilTest, BucketReduceSparseGradient2) { // The indices is a vector and the grad is a tensor with shape (6, 2) /* 0 * 0 @@ -80,16 +99,36 @@ TEST_F(CommonUtilTest, DeduplicateIndexedSlicesTest2) { for (int i = 0; i < 6 * 2; i++) { grad.push_back(i); } - std::vector unique_indices(2); - std::vector summed_grad(4); - SparseGradient unique_grad({summed_grad.data(), unique_indices.data(), 0}); - ReduceSparseGradient(SparseGradient({grad.data(), indices.data(), 6}), &unique_grad, 6, 2); + std::vector unique_indices(6); + std::vector summed_grad(12); + std::vector tmp_indices(6); + std::vector tmp_grad(12); + SparseGradient unique_grad({summed_grad.data(), unique_indices.data(), 6}); + SparseGradient workspace_grad({tmp_grad.data(), tmp_indices.data(), 6}); + SparseGradient input_grad({grad.data(), indices.data(), 6}); + + ReduceSparseGradientParam param; + param.input_grad_ = &input_grad; + param.workspace_grad_ = &workspace_grad; + param.output_grad_ = &unique_grad; + param.max_index_ = 6; + param.value_stride_ = 2; + BucketReduceSparseGradient(param); + EXPECT_EQ(unique_grad.indices_size_, 2); - EXPECT_EQ(unique_indices, std::vector({0, 1})); + + std::vector expect_indices({0, 1}); + for (size_t i = 0; i < unique_grad.indices_size_; ++i) { + EXPECT_EQ(unique_grad.indices_[i], expect_indices[i]); + } + /* 10 13 * 10 12 */ - EXPECT_EQ(summed_grad, std::vector({10, 13, 10, 12})); + std::vector expect_value({10, 13, 10, 12}); + for (size_t i = 0; i < unique_grad.indices_size_ * 2; ++i) { + EXPECT_EQ(unique_grad.value_[i], expect_value[i]); + } } } // namespace kernel } // namespace mindspore