!2989 bucket reduce sparse gradient

Merge pull request !2989 from kisnwang/two-step-reduce-sparse-gradient
This commit is contained in:
mindspore-ci-bot 2020-07-17 11:50:58 +08:00 committed by Gitee
commit 88c4943abd
8 changed files with 465 additions and 253 deletions

View File

@ -20,6 +20,7 @@
#include <iostream>
#include <utility>
#include <fstream>
#include <algorithm>
#include <thread>
#include "nlohmann/json.hpp"
#include "backend/session/anf_runtime_algorithm.h"
@ -499,235 +500,329 @@ int Sign(float x) {
return 0;
}
void DeduplicateIndexedSlices(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad, size_t first_dim,
size_t outer_dim) {
MS_EXCEPTION_IF_NULL(origin_sparse_grad.value_);
MS_EXCEPTION_IF_NULL(origin_sparse_grad.indices_);
MS_EXCEPTION_IF_NULL(unique_grad);
MS_EXCEPTION_IF_NULL(unique_grad->value_);
MS_EXCEPTION_IF_NULL(unique_grad->indices_);
namespace {
struct BucketSparseGradient {
float *value_;
int *indices_;
int *global_indices_;
size_t indices_size_;
};
struct MultiThreadReduceSparseGradientParam {
SparseGradient *input_grad_{nullptr};
SparseGradient *workspace_grad_{nullptr};
SparseGradient *output_grad_{nullptr};
size_t max_index_{0};
size_t value_stride_{0};
size_t thread_num_{0};
bool use_sort_reduce_{false};
};
void CalculateEachBucketSize(const std::shared_ptr<SparseGradient> &sparse_grad, size_t max_index,
std::vector<size_t> *each_bucket_size) {
MS_LOG(DEBUG) << "Start";
MS_EXCEPTION_IF_NULL(sparse_grad);
MS_EXCEPTION_IF_NULL(sparse_grad->indices_);
MS_EXCEPTION_IF_NULL(each_bucket_size);
size_t bucket_num = each_bucket_size->size();
for (size_t i = 0; i < sparse_grad->indices_size_; ++i) {
int index = sparse_grad->indices_[i];
if (index >= 0 && IntToSize(index) < max_index) {
auto bucket_id = index % bucket_num;
each_bucket_size->at(bucket_id)++;
}
}
MS_LOG(DEBUG) << "End";
}
void SplitAndCalculateSegmentBucketSize(const MultiThreadReduceSparseGradientParam &param,
std::vector<std::shared_ptr<SparseGradient>> *segments_ptr,
std::vector<std::shared_ptr<std::vector<size_t>>> *segment_bucket_sizes_ptr) {
MS_EXCEPTION_IF_NULL(param.input_grad_);
MS_EXCEPTION_IF_NULL(segment_bucket_sizes_ptr);
MS_EXCEPTION_IF_NULL(segments_ptr);
auto &segments = *segments_ptr;
auto &segment_bucket_sizes = *segment_bucket_sizes_ptr;
auto input_grad = param.input_grad_;
if (param.thread_num_ < 1) {
MS_EXCEPTION(ArgumentError) << "Input param thread num must > 0!";
}
size_t thread_indices_size = input_grad->indices_size_ / param.thread_num_;
size_t left_indices_size = input_grad->indices_size_ % param.thread_num_;
std::vector<std::thread> threads;
threads.reserve(param.thread_num_);
segments.reserve(param.thread_num_);
size_t current_indices_offset = 0;
for (size_t i = 0; i < param.thread_num_; ++i) {
segment_bucket_sizes.emplace_back(std::make_shared<std::vector<size_t>>(param.thread_num_, 0));
size_t indices_size = thread_indices_size;
if (i < left_indices_size) {
indices_size += 1;
}
segments.emplace_back(std::make_shared<SparseGradient>());
segments[i]->value_ = input_grad->value_ + current_indices_offset * param.value_stride_;
segments[i]->indices_ = input_grad->indices_ + current_indices_offset;
segments[i]->indices_size_ = indices_size;
threads.emplace_back(
std::thread(CalculateEachBucketSize, segments[i], param.max_index_, segment_bucket_sizes[i].get()));
current_indices_offset += indices_size;
}
for (size_t i = 0; i < param.thread_num_; ++i) {
threads[i].join();
}
}
void CopySegmentIndicesToBucket(const MultiThreadReduceSparseGradientParam &param,
const std::shared_ptr<SparseGradient> &segment, size_t bucket_offset,
const std::vector<std::shared_ptr<BucketSparseGradient>> &buckets) {
MS_LOG(DEBUG) << "Start";
MS_EXCEPTION_IF_NULL(segment);
MS_EXCEPTION_IF_NULL(segment->indices_);
std::vector<size_t> bucket_data_num(param.thread_num_, 0);
for (size_t i = 0; i < segment->indices_size_; ++i) {
int index = segment->indices_[i];
if (index >= 0 && IntToSize(index) < param.max_index_) {
auto bucket_id = index % param.thread_num_;
auto bucket_index = bucket_data_num[bucket_id];
buckets[bucket_id]->indices_[bucket_index] = index;
buckets[bucket_id]->global_indices_[bucket_index] = bucket_offset + i;
bucket_data_num[bucket_id]++;
}
}
MS_LOG(DEBUG) << "End";
}
void GatherSegmentIndicesToOutputBucket(const MultiThreadReduceSparseGradientParam &param,
const std::vector<std::shared_ptr<SparseGradient>> &segments,
const std::vector<std::shared_ptr<std::vector<size_t>>> &segment_bucket_sizes,
std::vector<std::shared_ptr<BucketSparseGradient>> *buckets_ptr) {
MS_EXCEPTION_IF_NULL(param.output_grad_);
MS_EXCEPTION_IF_NULL(param.output_grad_->value_);
MS_EXCEPTION_IF_NULL(param.output_grad_->indices_);
MS_EXCEPTION_IF_NULL(buckets_ptr);
auto &buckets = *buckets_ptr;
size_t thread_num = param.thread_num_;
if (thread_num != segment_bucket_sizes.size()) {
MS_EXCEPTION(ArgumentError) << "Input param thread num not equal to segment size!";
}
std::vector<size_t> bucket_data_size(thread_num, 0);
for (size_t i = 0; i < thread_num; ++i) {
for (size_t j = 0; j < thread_num; ++j) {
bucket_data_size[j] += segment_bucket_sizes[i]->at(j);
}
}
size_t current_indices_offset = 0;
for (size_t i = 0; i < thread_num; ++i) {
buckets.emplace_back(std::make_shared<BucketSparseGradient>());
buckets[i]->value_ = param.output_grad_->value_ + current_indices_offset * param.value_stride_;
buckets[i]->indices_ = param.output_grad_->indices_ + current_indices_offset;
buckets[i]->global_indices_ = param.workspace_grad_->indices_ + current_indices_offset;
buckets[i]->indices_size_ = bucket_data_size[i];
current_indices_offset += bucket_data_size[i];
}
std::vector<size_t> tmp_bucket_data_size(thread_num, 0);
std::vector<std::vector<std::shared_ptr<BucketSparseGradient>>> each_thread_buckets;
for (size_t i = 0; i < thread_num; ++i) {
std::vector<std::shared_ptr<BucketSparseGradient>> thread_buckets;
for (size_t j = 0; j < thread_num; ++j) {
thread_buckets.emplace_back(std::make_shared<BucketSparseGradient>());
thread_buckets[j]->indices_ = buckets[j]->indices_ + tmp_bucket_data_size[j];
thread_buckets[j]->global_indices_ = buckets[j]->global_indices_ + tmp_bucket_data_size[j];
thread_buckets[j]->value_ = buckets[j]->value_ + tmp_bucket_data_size[j] * param.value_stride_;
thread_buckets[j]->indices_size_ = segment_bucket_sizes[i]->at(j);
tmp_bucket_data_size[j] += segment_bucket_sizes[i]->at(j);
}
each_thread_buckets.emplace_back(thread_buckets);
}
std::vector<std::thread> threads;
threads.reserve(thread_num);
current_indices_offset = 0;
for (size_t i = 0; i < thread_num; ++i) {
threads.emplace_back(
std::thread(CopySegmentIndicesToBucket, param, segments[i], current_indices_offset, each_thread_buckets[i]));
current_indices_offset += segments[i]->indices_size_;
}
for (size_t i = 0; i < thread_num; ++i) {
threads[i].join();
}
}
void SortAndReduceBucketSparseGradient(const MultiThreadReduceSparseGradientParam &param,
const std::shared_ptr<BucketSparseGradient> &bucket,
const std::shared_ptr<SparseGradient> &reduced_bucket) {
MS_LOG(DEBUG) << "Start";
MS_EXCEPTION_IF_NULL(bucket);
MS_EXCEPTION_IF_NULL(bucket->value_);
MS_EXCEPTION_IF_NULL(bucket->indices_);
MS_EXCEPTION_IF_NULL(reduced_bucket);
MS_EXCEPTION_IF_NULL(reduced_bucket->value_);
MS_EXCEPTION_IF_NULL(reduced_bucket->indices_);
std::vector<std::pair<int, int>> sorted_indices;
sorted_indices.reserve(bucket->indices_size_);
for (size_t i = 0; i < bucket->indices_size_; ++i) {
int index = bucket->indices_[i];
int global_index = bucket->global_indices_[i];
sorted_indices.emplace_back(std::pair<int, int>(index, global_index));
}
std::sort(sorted_indices.begin(), sorted_indices.end());
float *global_value = param.input_grad_->value_;
size_t unique_indices_size = 0;
size_t max_length = reduced_bucket->indices_size_ * param.value_stride_;
int last_index{0};
size_t value_offset{0};
for (size_t i = 0; i < sorted_indices.size(); ++i) {
int index = sorted_indices[i].first;
int global_index = sorted_indices[i].second;
int global_value_offset = global_index * param.value_stride_;
if (i == 0 || index != last_index) {
if (i != 0) {
unique_indices_size++;
}
reduced_bucket->indices_[unique_indices_size] = index;
value_offset = unique_indices_size * param.value_stride_;
auto ret_code = memcpy_s(reduced_bucket->value_ + value_offset, (max_length - value_offset) * sizeof(float),
global_value + global_value_offset, param.value_stride_ * sizeof(float));
if (ret_code != EOK) {
MS_LOG(EXCEPTION) << "Failed to copy data!";
}
} else {
for (size_t j = 0; j < param.value_stride_; ++j) {
reduced_bucket->value_[value_offset + j] += global_value[global_value_offset + j];
}
}
last_index = index;
}
reduced_bucket->indices_size_ = unique_indices_size;
MS_LOG(DEBUG) << "End";
}
void ReduceBucketSparseGradient(const MultiThreadReduceSparseGradientParam &param,
const std::shared_ptr<BucketSparseGradient> &bucket,
const std::shared_ptr<SparseGradient> &reduced_bucket) {
MS_LOG(DEBUG) << "Start";
MS_EXCEPTION_IF_NULL(bucket);
MS_EXCEPTION_IF_NULL(bucket->value_);
MS_EXCEPTION_IF_NULL(bucket->indices_);
MS_EXCEPTION_IF_NULL(reduced_bucket);
MS_EXCEPTION_IF_NULL(reduced_bucket->value_);
MS_EXCEPTION_IF_NULL(reduced_bucket->indices_);
float *global_value = param.input_grad_->value_;
std::unordered_map<int, size_t> index_map;
size_t unique_indices_size = 0;
for (size_t i = 0; i < origin_sparse_grad.indices_size_; ++i) {
int index = origin_sparse_grad.indices_[i];
if (index < 0 || IntToSize(index) >= first_dim) {
continue;
}
size_t max_length = reduced_bucket->indices_size_ * param.value_stride_;
for (size_t i = 0; i < bucket->indices_size_; ++i) {
int index = bucket->indices_[i];
int global_index = bucket->global_indices_[i];
auto iter = index_map.find(index);
if (iter == index_map.end()) {
index_map[index] = unique_indices_size;
unique_grad->indices_[unique_indices_size] = index;
size_t start_index = unique_indices_size * outer_dim;
size_t end_index = start_index + outer_dim;
for (size_t j = start_index, k = i * outer_dim; j < end_index; ++j, ++k) {
unique_grad->value_[j] = origin_sparse_grad.value_[k];
reduced_bucket->indices_[unique_indices_size] = index;
size_t start_index = unique_indices_size * param.value_stride_;
index_map[index] = start_index;
auto ret_code = memcpy_s(reduced_bucket->value_ + start_index, (max_length - start_index) * sizeof(float),
global_value + global_index * param.value_stride_, param.value_stride_ * sizeof(float));
if (ret_code != EOK) {
MS_LOG(EXCEPTION) << "Failed to copy data!";
}
unique_indices_size++;
} else {
size_t first_index = iter->second;
size_t start_index = first_index * outer_dim;
size_t end_index = start_index + outer_dim;
for (size_t j = start_index, k = i * outer_dim; j < end_index; ++j, ++k) {
unique_grad->value_[j] += origin_sparse_grad.value_[k];
size_t start_index = iter->second;
size_t end_index = start_index + param.value_stride_;
for (size_t j = start_index, k = global_index * param.value_stride_; j < end_index; ++j, ++k) {
reduced_bucket->value_[j] += global_value[k];
}
}
}
unique_grad->indices_size_ = unique_indices_size;
}
struct WorkerParamsForReduceSparseGradient {
size_t slice_start_{0};
size_t slice_end_{0};
size_t max_length_{0};
size_t outer_dim_{0};
std::vector<std::pair<int, size_t>> *sorted_indices_{nullptr};
std::vector<size_t> *slice_positions_{nullptr};
float *src_value_{nullptr};
SparseGradient *unique_grad_{nullptr};
};
void WorkerForReduceSparseGradient(WorkerParamsForReduceSparseGradient param) {
MS_EXCEPTION_IF_NULL(param.sorted_indices_);
MS_EXCEPTION_IF_NULL(param.slice_positions_);
MS_EXCEPTION_IF_NULL(param.src_value_);
MS_EXCEPTION_IF_NULL(param.unique_grad_);
auto outer_dim = param.outer_dim_;
auto &sorted_indices = *(param.sorted_indices_);
auto &slice_positions = *(param.slice_positions_);
auto unique_grad = param.unique_grad_;
for (size_t slice_id = param.slice_start_; slice_id < param.slice_end_; ++slice_id) {
size_t cur_pos = slice_positions[slice_id];
int index = sorted_indices[cur_pos].first;
unique_grad->indices_[slice_id] = index;
size_t start_index = slice_id * outer_dim;
auto ret_code = memcpy_s(unique_grad->value_ + start_index, (param.max_length_ - start_index) * sizeof(float),
param.src_value_ + sorted_indices[cur_pos].second, outer_dim * sizeof(float));
if (ret_code != EOK) {
MS_LOG(EXCEPTION) << "Failed to copy data!";
}
cur_pos++;
size_t end_pos;
if (slice_id + 1 < slice_positions.size()) {
end_pos = slice_positions[slice_id + 1];
} else {
end_pos = sorted_indices.size();
}
while (cur_pos < end_pos) {
for (size_t i = 0; i < outer_dim; ++i) {
unique_grad->value_[start_index + i] += param.src_value_[sorted_indices[cur_pos].second + i];
}
cur_pos++;
}
}
}
void RunMultiThreadReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad,
size_t outer_dim, std::vector<std::pair<int, size_t>> *sorted_indices,
std::vector<size_t> *slice_positions) {
MS_LOG(DEBUG) << "Start";
size_t thread_num = 24;
if (slice_positions->size() < thread_num) {
thread_num = slice_positions->size();
}
size_t stride = (slice_positions->size() + thread_num - 1) / thread_num;
thread_num = (slice_positions->size() + stride - 1) / stride;
std::vector<std::thread> threads;
size_t max_length = sorted_indices->size() * outer_dim;
for (size_t i = 0; i < thread_num; ++i) {
size_t slice_start = i * stride;
size_t slice_end = 0;
if (i == thread_num - 1) {
slice_end = slice_positions->size();
} else {
slice_end = slice_start + stride;
}
WorkerParamsForReduceSparseGradient params{
slice_start, slice_end, max_length, outer_dim, sorted_indices, slice_positions, origin_sparse_grad.value_,
unique_grad};
threads.emplace_back(std::thread(WorkerForReduceSparseGradient, params));
}
for (size_t i = 0; i < thread_num; ++i) {
threads[i].join();
}
reduced_bucket->indices_size_ = unique_indices_size;
MS_LOG(DEBUG) << "End";
}
void ReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad, size_t first_dim,
size_t outer_dim, bool use_multi_threads) {
MS_LOG(DEBUG) << "Start";
MS_EXCEPTION_IF_NULL(origin_sparse_grad.value_);
MS_EXCEPTION_IF_NULL(origin_sparse_grad.indices_);
MS_EXCEPTION_IF_NULL(unique_grad);
MS_EXCEPTION_IF_NULL(unique_grad->value_);
MS_EXCEPTION_IF_NULL(unique_grad->indices_);
std::vector<std::pair<int, size_t>> sorted_indices;
sorted_indices.reserve(origin_sparse_grad.indices_size_);
for (size_t i = 0; i < origin_sparse_grad.indices_size_; ++i) {
int index = origin_sparse_grad.indices_[i];
if (index >= 0 && IntToSize(index) < first_dim) {
sorted_indices.emplace_back(std::pair<int, size_t>(index, i * outer_dim));
}
}
std::sort(
sorted_indices.begin(), sorted_indices.end(),
[](const std::pair<int, size_t> &left, const std::pair<int, size_t> &right) { return left.first < right.first; });
int last_index = 0;
std::vector<size_t> slice_positions;
slice_positions.reserve(sorted_indices.size());
for (size_t i = 0; i < sorted_indices.size(); ++i) {
if (i == 0 || last_index != sorted_indices[i].first) {
slice_positions.emplace_back(i);
}
last_index = sorted_indices[i].first;
}
if (use_multi_threads) {
RunMultiThreadReduceSparseGradient(origin_sparse_grad, unique_grad, outer_dim, &sorted_indices, &slice_positions);
} else {
size_t max_length = sorted_indices.size() * outer_dim;
WorkerParamsForReduceSparseGradient params{0,
slice_positions.size(),
max_length,
outer_dim,
&sorted_indices,
&slice_positions,
origin_sparse_grad.value_,
unique_grad};
WorkerForReduceSparseGradient(params);
}
unique_grad->indices_size_ = slice_positions.size();
MS_LOG(DEBUG) << "End";
}
void ReduceMultiSparseGradient(const std::vector<std::shared_ptr<SparseGradient>> &unique_slice_grads,
SparseGradient *tmp_grad, SparseGradient *unique_grad, size_t first_dim,
size_t outer_dim) {
MS_LOG(DEBUG) << "Start";
if (unique_slice_grads.empty()) {
return;
}
size_t index_data_size = outer_dim * sizeof(float);
size_t unique_indices_size = 0;
for (size_t i = 0; i < unique_slice_grads.size(); ++i) {
auto &slice_grad = unique_slice_grads[i];
auto ret_code = memcpy_s(tmp_grad->value_ + unique_indices_size * outer_dim,
(tmp_grad->indices_size_ - unique_indices_size) * index_data_size, slice_grad->value_,
slice_grad->indices_size_ * index_data_size);
if (ret_code != EOK) {
MS_LOG(EXCEPTION) << "Failed to copy data!";
}
ret_code =
memcpy_s(tmp_grad->indices_ + unique_indices_size, (tmp_grad->indices_size_ - unique_indices_size) * sizeof(int),
slice_grad->indices_, slice_grad->indices_size_ * sizeof(int));
if (ret_code != EOK) {
MS_LOG(EXCEPTION) << "Failed to copy data!";
}
unique_indices_size += slice_grad->indices_size_;
}
tmp_grad->indices_size_ = unique_indices_size;
ReduceSparseGradient(*tmp_grad, unique_grad, first_dim, outer_dim);
MS_LOG(DEBUG) << "End";
}
void TwoLevelReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *tmp_grad,
SparseGradient *unique_grad, size_t first_dim, size_t outer_dim) {
MS_LOG(DEBUG) << "Start";
MS_EXCEPTION_IF_NULL(origin_sparse_grad.value_);
MS_EXCEPTION_IF_NULL(origin_sparse_grad.indices_);
MS_EXCEPTION_IF_NULL(unique_grad);
MS_EXCEPTION_IF_NULL(unique_grad->value_);
MS_EXCEPTION_IF_NULL(unique_grad->indices_);
MS_EXCEPTION_IF_NULL(tmp_grad);
MS_EXCEPTION_IF_NULL(tmp_grad->value_);
MS_EXCEPTION_IF_NULL(tmp_grad->indices_);
size_t thread_num = 24;
if (origin_sparse_grad.indices_size_ < thread_num) {
thread_num = origin_sparse_grad.indices_size_;
}
size_t thread_indices_size = origin_sparse_grad.indices_size_ / thread_num;
size_t left_indices_size = origin_sparse_grad.indices_size_ % thread_num;
void ReduceBucketSparseGradientToWorkspace(const MultiThreadReduceSparseGradientParam &param,
const std::vector<std::shared_ptr<BucketSparseGradient>> &buckets,
std::vector<std::shared_ptr<SparseGradient>> *reduced_buckets_ptr) {
MS_EXCEPTION_IF_NULL(param.workspace_grad_);
MS_EXCEPTION_IF_NULL(param.workspace_grad_->value_);
MS_EXCEPTION_IF_NULL(param.workspace_grad_->indices_);
MS_EXCEPTION_IF_NULL(reduced_buckets_ptr);
auto &reduced_buckets = *reduced_buckets_ptr;
size_t thread_num = buckets.size();
std::vector<std::thread> threads;
threads.reserve(thread_num);
std::vector<std::shared_ptr<SparseGradient>> unique_slice_grads;
size_t current_indices_offset = 0;
for (size_t i = 0; i < thread_num; ++i) {
size_t indices_size = thread_indices_size;
if (i == thread_num - 1) {
indices_size = thread_indices_size + left_indices_size;
reduced_buckets.emplace_back(std::make_shared<SparseGradient>());
reduced_buckets[i]->value_ = param.workspace_grad_->value_ + current_indices_offset * param.value_stride_;
reduced_buckets[i]->indices_ = param.workspace_grad_->indices_ + current_indices_offset;
reduced_buckets[i]->indices_size_ = buckets[i]->indices_size_;
if (param.use_sort_reduce_) {
threads.emplace_back(std::thread(SortAndReduceBucketSparseGradient, param, buckets[i], reduced_buckets[i]));
} else {
threads.emplace_back(std::thread(ReduceBucketSparseGradient, param, buckets[i], reduced_buckets[i]));
}
size_t value_offset = i * thread_indices_size * outer_dim;
size_t indices_offset = i * thread_indices_size;
auto slice_grad = SparseGradient(
{origin_sparse_grad.value_ + value_offset, origin_sparse_grad.indices_ + indices_offset, indices_size});
unique_slice_grads.emplace_back(std::make_shared<SparseGradient>());
unique_slice_grads[i]->value_ = unique_grad->value_ + value_offset;
unique_slice_grads[i]->indices_ = unique_grad->indices_ + indices_offset;
unique_slice_grads[i]->indices_size_ = indices_size;
threads.emplace_back(
std::thread(ReduceSparseGradient, slice_grad, unique_slice_grads[i].get(), first_dim, outer_dim, false));
current_indices_offset += buckets[i]->indices_size_;
}
for (size_t i = 0; i < thread_num; ++i) {
threads[i].join();
}
ReduceMultiSparseGradient(unique_slice_grads, tmp_grad, unique_grad, first_dim, outer_dim);
}
void MergeReduceSparseGradient(const MultiThreadReduceSparseGradientParam &param,
const std::vector<std::shared_ptr<SparseGradient>> &reduced_buckets) {
MS_EXCEPTION_IF_NULL(param.output_grad_);
auto output_grad = param.output_grad_;
MS_EXCEPTION_IF_NULL(output_grad->value_);
MS_EXCEPTION_IF_NULL(output_grad->indices_);
size_t stride_data_size = param.value_stride_ * sizeof(float);
size_t unique_indices_size = 0;
for (size_t i = 0; i < reduced_buckets.size(); ++i) {
auto &bucket = reduced_buckets[i];
MS_EXCEPTION_IF_NULL(bucket);
if (bucket->indices_size_ == 0) {
continue;
}
auto ret_code = memcpy_s(output_grad->value_ + unique_indices_size * param.value_stride_,
(output_grad->indices_size_ - unique_indices_size) * stride_data_size, bucket->value_,
bucket->indices_size_ * stride_data_size);
if (ret_code != EOK) {
MS_LOG(EXCEPTION) << "Failed to copy data!";
}
ret_code = memcpy_s(output_grad->indices_ + unique_indices_size,
(output_grad->indices_size_ - unique_indices_size) * sizeof(int), bucket->indices_,
bucket->indices_size_ * sizeof(int));
if (ret_code != EOK) {
MS_LOG(EXCEPTION) << "Failed to copy data!";
}
unique_indices_size += bucket->indices_size_;
}
output_grad->indices_size_ = unique_indices_size;
}
} // namespace
void BucketReduceSparseGradient(const ReduceSparseGradientParam &param) {
MS_LOG(DEBUG) << "Start";
MS_EXCEPTION_IF_NULL(param.input_grad_);
size_t thread_num = 23;
if (param.input_grad_->indices_size_ < thread_num) {
thread_num = param.input_grad_->indices_size_;
}
MultiThreadReduceSparseGradientParam multi_thread_param({param.input_grad_, param.workspace_grad_, param.output_grad_,
param.max_index_, param.value_stride_, thread_num,
param.use_sort_reduce_});
std::vector<std::shared_ptr<SparseGradient>> segments;
std::vector<std::shared_ptr<std::vector<size_t>>> segment_bucket_sizes;
SplitAndCalculateSegmentBucketSize(multi_thread_param, &segments, &segment_bucket_sizes);
std::vector<std::shared_ptr<BucketSparseGradient>> buckets;
GatherSegmentIndicesToOutputBucket(multi_thread_param, segments, segment_bucket_sizes, &buckets);
std::vector<std::shared_ptr<SparseGradient>> reduced_buckets;
ReduceBucketSparseGradientToWorkspace(multi_thread_param, buckets, &reduced_buckets);
MergeReduceSparseGradient(multi_thread_param, reduced_buckets);
MS_LOG(DEBUG) << "End";
}

View File

@ -73,9 +73,18 @@ class KernelMeta {
};
struct SparseGradient {
float *value_;
int *indices_;
size_t indices_size_;
float *value_{nullptr};
int *indices_{nullptr};
size_t indices_size_{0};
};
struct ReduceSparseGradientParam {
SparseGradient *input_grad_{nullptr};
SparseGradient *workspace_grad_{nullptr};
SparseGradient *output_grad_{nullptr};
size_t max_index_{0};
size_t value_stride_{0};
bool use_sort_reduce_{false};
};
struct MultiThreadComputeParams {
@ -112,10 +121,6 @@ void SaveJsonInfo(const std::string &json_name, const std::string &info);
std::string GetProcessor(const AnfNodePtr &anf_node);
bool IsSameShape(const std::vector<size_t> &shape_a, const std::vector<size_t> &shape_b);
int Sign(float x);
void DeduplicateIndexedSlices(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad, size_t first_dim,
size_t outer_dim);
void ReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad, size_t first_dim,
size_t outer_dim, bool use_multi_threads = true);
std::pair<AnfNodePtr, size_t> GetKernelInput(const AnfNodePtr &anf_node, size_t index);
std::vector<std::pair<AnfNodePtr, std::pair<size_t, size_t>>> GetInputIndex(const std::vector<AnfNodePtr> &node_list,
const std::vector<AnfNodePtr> &input_list);
@ -130,14 +135,7 @@ void GetGraphRealOutput(const FuncGraphPtr &func_graph, std::vector<std::pair<An
bool IsWeightBoundary(const AnfNodePtr &node);
void MultiThreadCompute(const MultiThreadComputeFunc &func, MultiThreadComputeParams *params,
size_t total_compute_size);
void RunMultiThreadReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad,
size_t outer_dim, std::vector<std::pair<int, size_t>> *sorted_indices,
std::vector<size_t> *slice_positions);
void ReduceMultiSparseGradient(const std::vector<std::shared_ptr<SparseGradient>> &unique_slice_grads,
SparseGradient *tmp_grad, SparseGradient *unique_grad, size_t first_dim,
size_t outer_dim);
void TwoLevelReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *tmp_grad,
SparseGradient *unique_grad, size_t first_dim, size_t outer_dim);
void BucketReduceSparseGradient(const ReduceSparseGradientParam &param);
std::vector<int> GetReduceAttrAxis(const CNodePtr &cnode);
} // namespace kernel
} // namespace mindspore

View File

@ -81,6 +81,8 @@ void SparseApplyAdamCPUKernel::InitInputOutputSize(const CNodePtr &kernel_node)
MS_EXCEPTION_IF_NULL(kernel_node);
workspace_size_list_.emplace_back(indices_size_ * var_outer_dim_size_ * sizeof(float));
workspace_size_list_.emplace_back(indices_size_ * sizeof(int));
workspace_size_list_.emplace_back(indices_size_ * var_outer_dim_size_ * sizeof(float));
workspace_size_list_.emplace_back(indices_size_ * sizeof(int));
workspace_size_list_.emplace_back(var_first_dim_size_ * var_outer_dim_size_ * sizeof(float));
}
@ -142,11 +144,21 @@ bool SparseApplyAdamCPUKernel::Launch(const std::vector<kernel::AddressPtr> &inp
auto indices = reinterpret_cast<int *>(inputs[10]->addr);
auto new_grad = reinterpret_cast<float *>(workspace[0]->addr);
auto new_indices = reinterpret_cast<int *>(workspace[1]->addr);
auto m_t = reinterpret_cast<float *>(workspace[2]->addr);
auto workspace_grad = reinterpret_cast<float *>(workspace[2]->addr);
auto workspace_indices = reinterpret_cast<int *>(workspace[3]->addr);
auto m_t = reinterpret_cast<float *>(workspace[4]->addr);
SparseGradient unique_sparse_grad({new_grad, new_indices, indices_size_});
ReduceSparseGradient(SparseGradient({grad, indices, indices_size_}), &unique_sparse_grad, var_first_dim_size_,
var_outer_dim_size_);
SparseGradient workspace_sparse_grad({workspace_grad, workspace_indices, indices_size_});
SparseGradient input_sparse_grad({grad, indices, indices_size_});
ReduceSparseGradientParam param;
param.input_grad_ = &input_sparse_grad;
param.workspace_grad_ = &workspace_sparse_grad;
param.output_grad_ = &unique_sparse_grad;
param.max_index_ = var_first_dim_size_;
param.value_stride_ = var_outer_dim_size_;
BucketReduceSparseGradient(param);
size_t total_dim_size = var_first_dim_size_ * var_outer_dim_size_;
lr = lr * std::sqrt(1 - beta2_power) / (1 - beta1_power);

View File

@ -132,12 +132,19 @@ bool SparseApplyFtrlCPUKernel::Launch(const std::vector<kernel::AddressPtr> &inp
auto indices = reinterpret_cast<int *>(inputs[4]->addr);
auto new_grad = reinterpret_cast<float *>(workspace[0]->addr);
auto new_indices = reinterpret_cast<int *>(workspace[1]->addr);
auto tmp_grad = reinterpret_cast<float *>(workspace[2]->addr);
auto tmp_indices = reinterpret_cast<int *>(workspace[3]->addr);
auto workspace_grad = reinterpret_cast<float *>(workspace[2]->addr);
auto workspace_indices = reinterpret_cast<int *>(workspace[3]->addr);
SparseGradient unique_sparse_grad({new_grad, new_indices, indices_size_});
SparseGradient tmp_sparse_grad({tmp_grad, tmp_indices, indices_size_});
TwoLevelReduceSparseGradient(SparseGradient({grad, indices, indices_size_}), &tmp_sparse_grad, &unique_sparse_grad,
var_first_dim_size_, var_outer_dim_size_);
SparseGradient workspace_sparse_grad({workspace_grad, workspace_indices, indices_size_});
SparseGradient input_sparse_grad({grad, indices, indices_size_});
ReduceSparseGradientParam param;
param.input_grad_ = &input_sparse_grad;
param.workspace_grad_ = &workspace_sparse_grad;
param.output_grad_ = &unique_sparse_grad;
param.max_index_ = var_first_dim_size_;
param.value_stride_ = var_outer_dim_size_;
BucketReduceSparseGradient(param);
MultiThreadComputeParams input_params;
input_params.var_ = var;

View File

@ -123,13 +123,19 @@ bool SparseApplyLazyAdamCPUKernel::Launch(const std::vector<kernel::AddressPtr>
auto indices = reinterpret_cast<int *>(inputs[10]->addr);
auto new_grad = reinterpret_cast<float *>(workspace[0]->addr);
auto new_indices = reinterpret_cast<int *>(workspace[1]->addr);
auto tmp_grad = reinterpret_cast<float *>(workspace[2]->addr);
auto tmp_indices = reinterpret_cast<int *>(workspace[3]->addr);
auto workspace_grad = reinterpret_cast<float *>(workspace[2]->addr);
auto workspace_indices = reinterpret_cast<int *>(workspace[3]->addr);
SparseGradient unique_sparse_grad({new_grad, new_indices, indices_size_});
SparseGradient tmp_sparse_grad({tmp_grad, tmp_indices, indices_size_});
TwoLevelReduceSparseGradient(SparseGradient({grad, indices, indices_size_}), &tmp_sparse_grad, &unique_sparse_grad,
var_first_dim_size_, var_outer_dim_size_);
SparseGradient workspace_sparse_grad({workspace_grad, workspace_indices, indices_size_});
SparseGradient input_sparse_grad({grad, indices, indices_size_});
ReduceSparseGradientParam param;
param.input_grad_ = &input_sparse_grad;
param.workspace_grad_ = &workspace_sparse_grad;
param.output_grad_ = &unique_sparse_grad;
param.max_index_ = var_first_dim_size_;
param.value_stride_ = var_outer_dim_size_;
BucketReduceSparseGradient(param);
lr = lr * std::sqrt(1 - beta2_power) / (1 - beta1_power);
MultiThreadComputeParams input_params;

View File

@ -61,6 +61,8 @@ void SparseApplyProximalAdagradCPUKernel::InitInputOutputSize(const CNodePtr &ke
MS_EXCEPTION_IF_NULL(kernel_node);
workspace_size_list_.emplace_back(indices_size_ * var_outer_dim_size_ * sizeof(float));
workspace_size_list_.emplace_back(indices_size_ * sizeof(int));
workspace_size_list_.emplace_back(indices_size_ * var_outer_dim_size_ * sizeof(float));
workspace_size_list_.emplace_back(indices_size_ * sizeof(int));
}
void SparseApplyProximalAdagradCPUKernel::InitKernel(const CNodePtr &kernel_node) {
@ -119,9 +121,19 @@ bool SparseApplyProximalAdagradCPUKernel::Launch(const std::vector<kernel::Addre
auto indices = reinterpret_cast<int *>(inputs[6]->addr);
auto new_grad = reinterpret_cast<float *>(workspace[0]->addr);
auto new_indices = reinterpret_cast<int *>(workspace[1]->addr);
auto workspace_grad = reinterpret_cast<float *>(workspace[2]->addr);
auto workspace_indices = reinterpret_cast<int *>(workspace[3]->addr);
SparseGradient unique_sparse_grad({new_grad, new_indices, indices_size_});
ReduceSparseGradient(SparseGradient({grad, indices, indices_size_}), &unique_sparse_grad, var_first_dim_size_,
var_outer_dim_size_);
SparseGradient workspace_sparse_grad({workspace_grad, workspace_indices, indices_size_});
SparseGradient input_sparse_grad({grad, indices, indices_size_});
ReduceSparseGradientParam param;
param.input_grad_ = &input_sparse_grad;
param.workspace_grad_ = &workspace_sparse_grad;
param.output_grad_ = &unique_sparse_grad;
param.max_index_ = var_first_dim_size_;
param.value_stride_ = var_outer_dim_size_;
BucketReduceSparseGradient(param);
MultiThreadComputeParams input_params;
input_params.var_ = var;

View File

@ -16,6 +16,7 @@
#include "backend/session/cpu_session.h"
#include <algorithm>
#include <sstream>
#include "ir/tensor.h"
#include "ir/anf.h"
#include "backend/kernel_compiler/kernel.h"
@ -148,6 +149,48 @@ void CPUSession::SetKernelInfo(const KernelGraph *kernel_graph) {
}
}
namespace {
void KernelNotSupportException(const AnfNodePtr &kernel_node) {
std::string kernel_name = AnfAlgo::GetCNodeName(kernel_node);
std::stringstream operator_info;
operator_info << "Operator[" << kernel_name << "] ";
auto kernel_info = dynamic_cast<device::KernelInfo *>(kernel_node->kernel_info());
if (kernel_info == nullptr) {
operator_info << "is not support.";
MS_LOG(EXCEPTION) << operator_info.str();
}
auto kernel_build_Info = kernel_info->select_kernel_build_info();
if (kernel_build_Info == nullptr) {
operator_info << "is not support.";
MS_LOG(EXCEPTION) << operator_info.str();
}
size_t input_num = kernel_build_Info->GetInputNum();
if (input_num > 0) {
operator_info << " input(";
for (size_t i = 0; i < input_num; ++i) {
operator_info << TypeIdLabel(kernel_build_Info->GetInputDeviceType(i));
if (i != input_num - 1) {
operator_info << ",";
}
}
operator_info << ") ";
}
size_t output_num = kernel_build_Info->GetOutputNum();
if (output_num > 0) {
operator_info << "output(";
for (size_t i = 0; i < output_num; ++i) {
operator_info << TypeIdLabel(kernel_build_Info->GetOutputDeviceType(i));
if (i != kernel_build_Info->GetOutputNum() - 1) {
operator_info << ",";
}
}
operator_info << ") ";
}
operator_info << "is not support.";
MS_LOG(EXCEPTION) << operator_info.str();
}
} // namespace
void CPUSession::BuildKernel(const KernelGraph *kernel_graph) {
MS_EXCEPTION_IF_NULL(kernel_graph);
auto &kernel_nodes = kernel_graph->execution_order();
@ -158,7 +201,7 @@ void CPUSession::BuildKernel(const KernelGraph *kernel_graph) {
std::shared_ptr<kernel::CPUKernel> cpu_kernel =
kernel::CPUKernelFactory::GetInstance().Create(kernel_name, kernel_node);
if (cpu_kernel == nullptr) {
MS_LOG(EXCEPTION) << "Operator[" << kernel_name << "] is not support.";
KernelNotSupportException(kernel_node);
}
cpu_kernel->Init(kernel_node);
AnfAlgo::SetKernelMod(cpu_kernel, kernel_node.get());

View File

@ -25,7 +25,7 @@ class CommonUtilTest : public UT::Common {
CommonUtilTest() = default;
};
TEST_F(CommonUtilTest, DeduplicateIndexedSlicesTest1) {
TEST_F(CommonUtilTest, BucketReduceSparseGradient1) {
// The indices is a vector and the grad is a tensor with shape (6, 2)
/* 0
* 0
@ -46,20 +46,39 @@ TEST_F(CommonUtilTest, DeduplicateIndexedSlicesTest1) {
for (int i = 0; i < 6 * 2; i++) {
grad.push_back(i);
}
std::vector<int> unique_indices(3);
std::vector<float> summed_grad(6);
SparseGradient unique_grad({summed_grad.data(), unique_indices.data(), 0});
ReduceSparseGradient(SparseGradient({grad.data(), indices.data(), 6}), &unique_grad, 6, 2);
std::vector<int> unique_indices(6);
std::vector<float> summed_grad(12);
std::vector<int> tmp_indices(6);
std::vector<float> tmp_grad(12);
SparseGradient unique_grad({summed_grad.data(), unique_indices.data(), 6});
SparseGradient workspace_grad({tmp_grad.data(), tmp_indices.data(), 6});
SparseGradient input_grad({grad.data(), indices.data(), 6});
ReduceSparseGradientParam param;
param.input_grad_ = &input_grad;
param.workspace_grad_ = &workspace_grad;
param.output_grad_ = &unique_grad;
param.max_index_ = 6;
param.value_stride_ = 2;
BucketReduceSparseGradient(param);
EXPECT_EQ(unique_grad.indices_size_, 3);
EXPECT_EQ(unique_indices, std::vector<int>({0, 1, 3}));
std::vector<int> expect_indices({0, 1, 3});
for (size_t i = 0; i < unique_grad.indices_size_; ++i) {
EXPECT_EQ(unique_grad.indices_[i], expect_indices[i]);
}
/* 10 13
* 10 12
* 10 11
*/
EXPECT_EQ(summed_grad, std::vector<float>({10, 13, 10, 12, 10, 11}));
std::vector<int> expect_value({10, 13, 10, 12, 10, 11});
for (size_t i = 0; i < unique_grad.indices_size_ * 2; ++i) {
EXPECT_EQ(unique_grad.value_[i], expect_value[i]);
}
}
TEST_F(CommonUtilTest, DeduplicateIndexedSlicesTest2) {
TEST_F(CommonUtilTest, BucketReduceSparseGradient2) {
// The indices is a vector and the grad is a tensor with shape (6, 2)
/* 0
* 0
@ -80,16 +99,36 @@ TEST_F(CommonUtilTest, DeduplicateIndexedSlicesTest2) {
for (int i = 0; i < 6 * 2; i++) {
grad.push_back(i);
}
std::vector<int> unique_indices(2);
std::vector<float> summed_grad(4);
SparseGradient unique_grad({summed_grad.data(), unique_indices.data(), 0});
ReduceSparseGradient(SparseGradient({grad.data(), indices.data(), 6}), &unique_grad, 6, 2);
std::vector<int> unique_indices(6);
std::vector<float> summed_grad(12);
std::vector<int> tmp_indices(6);
std::vector<float> tmp_grad(12);
SparseGradient unique_grad({summed_grad.data(), unique_indices.data(), 6});
SparseGradient workspace_grad({tmp_grad.data(), tmp_indices.data(), 6});
SparseGradient input_grad({grad.data(), indices.data(), 6});
ReduceSparseGradientParam param;
param.input_grad_ = &input_grad;
param.workspace_grad_ = &workspace_grad;
param.output_grad_ = &unique_grad;
param.max_index_ = 6;
param.value_stride_ = 2;
BucketReduceSparseGradient(param);
EXPECT_EQ(unique_grad.indices_size_, 2);
EXPECT_EQ(unique_indices, std::vector<int>({0, 1}));
std::vector<int> expect_indices({0, 1});
for (size_t i = 0; i < unique_grad.indices_size_; ++i) {
EXPECT_EQ(unique_grad.indices_[i], expect_indices[i]);
}
/* 10 13
* 10 12
*/
EXPECT_EQ(summed_grad, std::vector<float>({10, 13, 10, 12}));
std::vector<int> expect_value({10, 13, 10, 12});
for (size_t i = 0; i < unique_grad.indices_size_ * 2; ++i) {
EXPECT_EQ(unique_grad.value_[i], expect_value[i]);
}
}
} // namespace kernel
} // namespace mindspore