From 442b38dc20954a96dc3e69acf63db829df9ef071 Mon Sep 17 00:00:00 2001 From: ZPaC Date: Wed, 26 Aug 2020 19:36:06 +0800 Subject: [PATCH] Delete extra file --- .../cpu/ps/sparse_apply_ftrl_ps_kernel.cc | 4 - .../ps/sparse_apply_lazy_adam_ps_kernel.cc | 4 - .../frontend/parallel/ps/parameter_server.h | 68 ++++++------- .../ccsrc/frontend/parallel/ps/worker_proxy.h | 99 ++++++++++--------- 4 files changed, 88 insertions(+), 87 deletions(-) diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc index cebb157c3c4..c11fb1b1d2f 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc @@ -88,10 +88,6 @@ void SparseApplyFtrlPSKernel::ReInit(const std::vector &inputs) { bool SparseApplyFtrlPSKernel::Execute(const std::vector &inputs, const std::vector &workspace, const std::vector &outputs) { ReInit(inputs); - int *indices = reinterpret_cast(inputs[4]->addr); - for (size_t i = 0; i < inputs[4]->size / sizeof(int); i++) { - indices[i] -= row_offset_; - } return Launch(inputs, workspace, outputs); } diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.cc b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.cc index da9f7463da0..82fbfc3a2aa 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.cc @@ -86,10 +86,6 @@ bool SparseApplyLazyAdamPSKernel::Execute(const std::vector &inputs, const std::vector &workspace, const std::vector &outputs) { ReInit(inputs); - int *indices = reinterpret_cast(inputs[10]->addr); - for (size_t i = 0; i < inputs[10]->size / sizeof(int); i++) { - indices[i] -= row_offset_; - } return Launch(inputs, workspace, outputs); } diff --git a/mindspore/ccsrc/frontend/parallel/ps/parameter_server.h b/mindspore/ccsrc/frontend/parallel/ps/parameter_server.h index 2032e177f57..03c0b768ca3 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/parameter_server.h +++ b/mindspore/ccsrc/frontend/parallel/ps/parameter_server.h @@ -511,28 +511,27 @@ void ParameterServer::UpdateWeights() { MS_EXCEPTION_IF_NULL(optimizer); std::shared_ptr optim_info = optim_infos_[key]; - if (optim_info == nullptr) { - continue; - } - const std::vector &inputs = optim_info->inputs(); - const std::vector &workspaces = optim_info->workspaces(); - const std::vector &outputs = optim_info->outputs(); + if (optim_info != nullptr) { + const std::vector &inputs = optim_info->inputs(); + const std::vector &workspaces = optim_info->workspaces(); + const std::vector &outputs = optim_info->outputs(); - std::shared_ptr>>> shapes = - std::make_shared>>>(); - std::shared_ptr> indices_shape = std::make_shared>(); - indices_shape->emplace_back(optim_info->indice_size()); - shapes->push_back(indices_shape); + std::shared_ptr>>> shapes = + std::make_shared>>>(); + std::shared_ptr> indices_shape = std::make_shared>(); + indices_shape->emplace_back(optim_info->indice_size()); + shapes->push_back(indices_shape); - if (original_optim_inputs_shape_.count(key) != 0) { - for (auto &input_shapes : *(original_optim_inputs_shape_[key])) { - shapes->push_back(input_shapes); + if (original_optim_inputs_shape_.count(key) != 0) { + for (auto &input_shapes : *(original_optim_inputs_shape_[key])) { + shapes->push_back(input_shapes); + } } + optimizer->ReInit(shapes); + optim_info->ComputeMean(shapes, worker_num_, pserver_num_, rank_id_); + optimizer->Execute(inputs, workspaces, outputs); + optim_info->Reset(); } - optimizer->ReInit(shapes); - optim_info->ComputeMean(shapes, worker_num_, pserver_num_, rank_id_); - optimizer->Execute(inputs, workspaces, outputs); - optim_info->Reset(); if (!is_embedding_[key]) { tokens_[key] = worker_num_; } @@ -545,23 +544,26 @@ template void ParameterServer::AccumGrad(const Keys &keys, const Values &values, const Lengths &lengths) { std::unique_lock lock(mutex_); const Key &key = keys[0]; - std::shared_ptr optim_info = optim_infos_[key]; + bool no_sparse_grad = values.size() == 1 && values[0] == -100; + if (!no_sparse_grad) { + std::shared_ptr optim_info = optim_infos_[key]; - // Create or update the optimizer info - if (optim_info == nullptr) { - const std::shared_ptr &builder = optim_info_builders_[weight_key_to_optims_[key]]; - std::shared_ptr pserver_kernel = optimizers_[key]; - if (pserver_kernel == nullptr) { - MS_LOG(EXCEPTION) << "no optimizer found for key " << key << " optim name " << weight_key_to_optims_[key]; + // Create or update the optimizer info + if (optim_info == nullptr) { + const std::shared_ptr &builder = optim_info_builders_[weight_key_to_optims_[key]]; + std::shared_ptr pserver_kernel = optimizers_[key]; + if (pserver_kernel == nullptr) { + MS_LOG(EXCEPTION) << "no optimizer found for key " << key << " optim name " << weight_key_to_optims_[key]; + } + MS_EXCEPTION_IF_NULL(pserver_kernel); + OptimizerInfo *optim = + builder->Build(pserver_kernel, weights_[key], keys, values, lengths, optim_inputs_shape_[key], worker_num_); + optim_info.reset(optim); + optim_infos_[key] = optim_info; + } else { + optim_info->Update(values, lengths); + optim_info->Accumulate(values, lengths); } - MS_EXCEPTION_IF_NULL(pserver_kernel); - OptimizerInfo *optim = - builder->Build(pserver_kernel, weights_[key], keys, values, lengths, optim_inputs_shape_[key], worker_num_); - optim_info.reset(optim); - optim_infos_[key] = optim_info; - } else { - optim_info->Update(values, lengths); - optim_info->Accumulate(values, lengths); } grads_accum_counter_[key] += 1; diff --git a/mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h b/mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h index 35037c11d3f..8896dabc4b6 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h +++ b/mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h @@ -112,7 +112,7 @@ class WorkerProxy : public ::ps::KVWorker { std::unique_ptr<::ps::Customer> general_customer_; std::unordered_map<::ps::Key, std::shared_ptr>> embedding_table_ranges_; std::unordered_map>> lookup_results_; - std::unordered_map> gathered_response_; + std::unordered_map>> gathered_response_; std::mutex mutex_; Slicer lookup_slicer_; Slicer sparse_slicer_; @@ -337,12 +337,19 @@ int WorkerProxy::AddGeneralRspCB(const ::ps::SArray<::ps::Key> &keys, ::ps::S int ts = general_customer_->NewRequest(::ps::kServerGroup); const auto &callback = [this, ts, keys, vals, lens, cb]() mutable { mutex_.lock(); - auto &kvs = gathered_response_[ts]; + std::map> server_kvs = gathered_response_[ts]; mutex_.unlock(); - *vals = kvs.vals; - if (lens) { - *lens = kvs.lens; + vals->clear(); + for (auto kvs : server_kvs) { + for (auto val : kvs.second.vals) { + vals->push_back(val); + } + if (lens) { + for (auto len : kvs.second.lens) { + lens->push_back(len); + } + } } mutex_.lock(); @@ -464,43 +471,50 @@ void WorkerProxy::SparseSlicer(int timestamp, const ::ps::KVPairs &send, c } } size_t indices_size = indice_ids.size(); - int slice_segment_size = indices_size * segment_size; - T *src_grad_data = new T[slice_segment_size]; - int *src_indice_data = new int[indices_size]; - PrepareSparseGradient(begin, end, distinct_ids, indice_to_grads, indice_data, segment_size, src_grad_data, - src_indice_data); + if (indices_size > 0) { + int slice_segment_size = indices_size * segment_size; + T *src_grad_data = new T[slice_segment_size]; + int *src_indice_data = new int[indices_size]; + PrepareSparseGradient(begin, end, distinct_ids, indice_to_grads, indice_data, segment_size, src_grad_data, + src_indice_data); - // Reduce the sparse gradient and indice - T *new_grad = new T[slice_segment_size]; - int *new_indices = new int[indices_size]; - mindspore::kernel::SparseGradient unique_sparse_grad({new_grad, new_indices, indices_size}); - Util::ReduceSparseGradient(src_grad_data, src_indice_data, indices_size, segment_size, first_dim_size, - outer_dim_size, &unique_sparse_grad); + // Reduce the sparse gradient and indice + T *new_grad = new T[slice_segment_size]; + int *new_indices = new int[indices_size]; + mindspore::kernel::SparseGradient unique_sparse_grad({new_grad, new_indices, indices_size}); + Util::ReduceSparseGradient(src_grad_data, src_indice_data, indices_size, segment_size, first_dim_size, + outer_dim_size, &unique_sparse_grad); - // Update the length of reduce sparse gradient and indice - ::ps::SArray reduced_lens; - reduced_lens.CopyFrom(kvs.lens); - reduced_lens[grad_index] = unique_sparse_grad.indices_size_ * segment_size; - reduced_lens[indice_index] = unique_sparse_grad.indices_size_; + // Update the length of reduce sparse gradient and indice + ::ps::SArray reduced_lens; + reduced_lens.CopyFrom(kvs.lens); + reduced_lens[grad_index] = unique_sparse_grad.indices_size_ * segment_size; + reduced_lens[indice_index] = unique_sparse_grad.indices_size_; - // Build the sparse value to be sent - size_t total_size = 0; - for (auto size : reduced_lens) { - total_size += size; + // Build the sparse value to be sent + size_t total_size = 0; + for (auto size : reduced_lens) { + total_size += size; + } + ::ps::SArray reduced_data(total_size, 0); + BuildSparseValue(reduced_lens, grad_index, indice_index, data, unique_sparse_grad.value_, + unique_sparse_grad.indices_, &reduced_data); + + kvs.lens = reduced_lens; + kvs.vals = reduced_data; } - ::ps::SArray reduced_data(total_size, 0); - BuildSparseValue(reduced_lens, grad_index, indice_index, data, unique_sparse_grad.value_, - unique_sparse_grad.indices_, &reduced_data); - - kvs.lens = reduced_lens; - kvs.vals = reduced_data; if (indices_size <= 0) { - sliced->at(i).first = false; - } else { - sliced->at(i).first = true; - expected_result_count_[timestamp] += 1; + ::ps::SArray no_keys; + ::ps::SArray no_vals; + ::ps::SArray no_lens; + no_keys.push_back(key); + no_vals.push_back(-100); + kvs.vals = no_vals; + kvs.lens = no_lens; } + sliced->at(i).first = true; + expected_result_count_[timestamp] += 1; } } @@ -554,8 +568,8 @@ void WorkerProxy::BuildSparseValue(const ::ps::SArray &lengths, const si } // Fill the reduced indice + int indice_offset = grad_offset + lengths[grad_index]; data_size = lengths[indice_index] * sizeof(T); - int indice_offset = grad_offset + data_size; T *indice_data = reduced_data->data() + indice_offset; T *convert = new T[lengths[indice_index]]; for (int i = 0; i < lengths[indice_index]; i++) { @@ -656,7 +670,7 @@ void WorkerProxy::ProcessLookupResult(const ::ps::Message &msg) { lookup_results_[ts].push_back(kvs); mutex_.unlock(); } - if (lookup_customer_->NumResponse(ts) == expected_result_count_[ts] - 1) { + if (lookup_customer_->NumResponse(ts) + 1 == server_num_) { const auto &cb = lookup_callbacks_[ts]; cb(); lookup_callbacks_.erase(ts); @@ -676,15 +690,8 @@ void WorkerProxy::ProcessResponse(const ::ps::Message &msg) { kvs.lens = msg.data[2]; } mutex_.lock(); - for (auto key : kvs.keys) { - gathered_response_[ts].keys.push_back(key); - } - for (auto val : kvs.vals) { - gathered_response_[ts].vals.push_back(val); - } - for (auto len : kvs.lens) { - gathered_response_[ts].lens.push_back(len); - } + int rsp_server_rank = ::ps::Postoffice::Get()->IDtoRank(msg.meta.sender); + gathered_response_[ts][rsp_server_rank] = kvs; mutex_.unlock(); if (general_customer_->NumResponse(ts) + 1 == server_num_) { const auto &cb = general_callbacks_[ts];