!3300 Optimize server code

Merge pull request !3300 from ZPaC/new-feature-of-ps
This commit is contained in:
mindspore-ci-bot 2020-07-22 12:35:28 +08:00 committed by Gitee
commit 21783ce221
12 changed files with 203 additions and 14 deletions

View File

@ -42,6 +42,7 @@ if (NOT (ENABLE_CPU AND (ENABLE_D OR ENABLE_GPU)))
list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/push_kernel.cc") list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/push_kernel.cc")
list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/sparse_apply_adam_ps_kernel.cc") list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/sparse_apply_adam_ps_kernel.cc")
list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/sparse_apply_ftrl_ps_kernel.cc") list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/sparse_apply_ftrl_ps_kernel.cc")
list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/sparse_apply_lazy_adam_ps_kernel.cc")
endif() endif()
if (ENABLE_GPU) if (ENABLE_GPU)

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
#ifndef MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_SPARSE_APPLY_ADAM_CPU_KERNEL_H_ #ifndef MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_SPARSE_APPLY_ADAM_PS_KERNEL_H_
#define MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_SPARSE_APPLY_ADAM_PS_KERNEL_H_ #define MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_SPARSE_APPLY_ADAM_PS_KERNEL_H_
#include <vector> #include <vector>

View File

@ -0,0 +1,104 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "backend/kernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.h"
#include <memory>
#include "backend/kernel_compiler/common_utils.h"
#include "runtime/device/cpu/cpu_device_address.h"
#include "frontend/parallel/ps/util.h"
namespace mindspore {
namespace kernel {
namespace ps {
void SparseApplyLazyAdamPSKernel::InitKernel(
const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &shapes) {
const std::vector<std::shared_ptr<std::vector<size_t>>> &shape_vec = *shapes;
std::vector<size_t> &var_shape = *(shape_vec[0]);
std::vector<size_t> &m_shape = *(shape_vec[1]);
std::vector<size_t> &v_shape = *(shape_vec[2]);
const std::vector<size_t> &grad_shape = *(shape_vec[9]);
const std::vector<size_t> &indices_shape = *(shape_vec[10]);
Shard(&var_shape, 0);
Shard(&m_shape, 0);
Shard(&v_shape, 0);
if (!IsSameShape(var_shape, m_shape)) {
MS_LOG(EXCEPTION) << "var and m should have the same shape";
}
if (!IsSameShape(var_shape, v_shape)) {
MS_LOG(EXCEPTION) << "var and v should have the same shape";
}
var_first_dim_size_ = var_shape[0];
for (size_t i = 1; i < var_shape.size(); ++i) {
if (var_shape[i] != grad_shape[i]) {
MS_LOG(EXCEPTION) << "The shape of var and grad must equal in dimension " << i;
}
var_outer_dim_size_ *= var_shape[i];
}
if (indices_shape.size() != 1) {
MS_LOG(EXCEPTION) << "indices must be 1D";
}
indices_size_ = indices_shape[0];
if (grad_shape[0] != indices_size_) {
MS_LOG(ERROR) << "The first dimension of grad shape must be equal to indices";
}
/*
if (AnfAlgo::HasNodeAttr(USE_NESTEROV, kernel_node)) {
use_nesterov_ = AnfAlgo::GetNodeAttr<bool>(kernel_node, "use_nesterov");
}
*/
workspace_size_list_.emplace_back(indices_size_ * var_outer_dim_size_ * sizeof(float));
workspace_size_list_.emplace_back(indices_size_ * sizeof(int));
workspace_size_list_.emplace_back(indices_size_ * var_outer_dim_size_ * sizeof(float));
workspace_size_list_.emplace_back(indices_size_ * sizeof(int));
workspace_size_list_.emplace_back(var_first_dim_size_ * var_outer_dim_size_ * sizeof(float));
}
void SparseApplyLazyAdamPSKernel::ReInit(
const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &shapes) {
const std::vector<std::shared_ptr<std::vector<size_t>>> &shape_vec = *shapes;
const std::vector<size_t> &indices_shape = *(shape_vec[0]);
indices_size_ = indices_shape[0];
workspace_size_list_[0] = indices_size_ * var_outer_dim_size_ * sizeof(float);
workspace_size_list_[1] = indices_size_ * sizeof(int);
}
void SparseApplyLazyAdamPSKernel::ReInit(const std::vector<AddressPtr> &inputs) {
const auto &indices_addr = inputs[10];
indices_size_ = indices_addr->size / sizeof(int);
workspace_size_list_[0] = indices_size_ * var_outer_dim_size_ * sizeof(float);
workspace_size_list_[1] = indices_size_ * sizeof(int);
}
bool SparseApplyLazyAdamPSKernel::Execute(const std::vector<AddressPtr> &inputs,
const std::vector<AddressPtr> &workspace,
const std::vector<AddressPtr> &outputs) {
ReInit(inputs);
int *indices = reinterpret_cast<int *>(inputs[10]->addr);
for (size_t i = 0; i < inputs[10]->size / sizeof(int); i++) {
indices[i] -= rank_id_ * var_first_dim_size_;
}
return Launch(inputs, workspace, outputs);
}
const std::vector<size_t> &SparseApplyLazyAdamPSKernel::input_sizes() const { return GetInputSizeList(); }
const std::vector<size_t> &SparseApplyLazyAdamPSKernel::output_sizes() const { return GetOutputSizeList(); }
const std::vector<size_t> &SparseApplyLazyAdamPSKernel::workspace_sizes() const { return GetWorkspaceSizeList(); }
} // namespace ps
} // namespace kernel
} // namespace mindspore

View File

@ -0,0 +1,49 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_SPARSE_APPLY_LAZY_ADAM_PS_KERNEL_H_
#define MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_SPARSE_APPLY_LAZY_ADAM_PS_KERNEL_H_
#include <vector>
#include <memory>
#include "backend/kernel_compiler/cpu/ps/pserver_kernel.h"
#include "backend/kernel_compiler/cpu/sparse_apply_lazy_adam_cpu_kernel.h"
namespace mindspore {
namespace kernel {
namespace ps {
using mindspore::kernel::SparseApplyLazyAdamCPUKernel;
class SparseApplyLazyAdamPSKernel : public SparseApplyLazyAdamCPUKernel, public PServerKernel {
public:
SparseApplyLazyAdamPSKernel(size_t rank_id, size_t pserver_num) : PServerKernel(rank_id, pserver_num) {}
~SparseApplyLazyAdamPSKernel() override = default;
void InitKernel(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &) override;
void ReInit(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &) override;
bool Execute(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &workspace,
const std::vector<AddressPtr> &outputs) override;
const std::vector<size_t> &input_sizes() const override;
const std::vector<size_t> &output_sizes() const override;
const std::vector<size_t> &workspace_sizes() const override;
protected:
void ReInit(const std::vector<AddressPtr> &) override;
};
} // namespace ps
} // namespace kernel
} // namespace mindspore
#endif // MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_SPARSE_APPLY_LAZY_ADAM_PS_KERNEL_H_

View File

@ -33,7 +33,7 @@ class SparseApplyLazyAdamCPUKernel : public CPUKernel {
bool Launch(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &workspace, bool Launch(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &workspace,
const std::vector<AddressPtr> &outputs) override; const std::vector<AddressPtr> &outputs) override;
private: protected:
size_t indices_size_{0}; size_t indices_size_{0};
size_t var_first_dim_size_{0}; size_t var_first_dim_size_{0};
size_t var_outer_dim_size_{1}; size_t var_outer_dim_size_{1};

View File

@ -63,6 +63,7 @@ constexpr int kInitWeightToOptimIdCmd = 11;
constexpr int kInitOptimInputsShapeCmd = 12; constexpr int kInitOptimInputsShapeCmd = 12;
constexpr int kInitEmbeddingsCmd = 20; constexpr int kInitEmbeddingsCmd = 20;
constexpr int kEmbeddingLookupCmd = 30; constexpr int kEmbeddingLookupCmd = 30;
constexpr int kFinalizeCmd = 40;
constexpr size_t kInvalidKey = UINT64_MAX; constexpr size_t kInvalidKey = UINT64_MAX;

View File

@ -57,6 +57,16 @@ void DenseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
} }
} }
void DenseOptimInfo::ComputeMean(size_t n) {
if (n > 1) {
float *accum_grad_data = reinterpret_cast<float *>(gradient()->addr);
size_t size = gradient()->size / sizeof(float);
for (size_t i = 0; i < size; i++) {
accum_grad_data[i] /= n;
}
}
}
void DenseOptimInfo::Reset() { memset_s(gradient()->addr, gradient()->size, 0x00, gradient()->size); } void DenseOptimInfo::Reset() { memset_s(gradient()->addr, gradient()->size, 0x00, gradient()->size); }
void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) { void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {

View File

@ -33,6 +33,7 @@ class OptimizerInfo {
virtual void Update(const Values &values, const Lengths &lengths) {} virtual void Update(const Values &values, const Lengths &lengths) {}
virtual void UpdateWeight(const WeightPtr &weight); virtual void UpdateWeight(const WeightPtr &weight);
virtual void Accumulate(const Values &values, const Lengths &lengths) = 0; virtual void Accumulate(const Values &values, const Lengths &lengths) = 0;
virtual void ComputeMean(size_t n) {}
virtual void Reset() {} virtual void Reset() {}
void AddWorkspace(const AddressPtr &workspace); void AddWorkspace(const AddressPtr &workspace);
@ -58,6 +59,7 @@ class DenseOptimInfo : public OptimizerInfo {
~DenseOptimInfo() override = default; ~DenseOptimInfo() override = default;
void Accumulate(const Values &values, const Lengths &lens) override; void Accumulate(const Values &values, const Lengths &lens) override;
void ComputeMean(size_t n) override;
void Reset() override; void Reset() override;
}; };

View File

@ -41,7 +41,7 @@
#include "backend/kernel_compiler/kernel.h" #include "backend/kernel_compiler/kernel.h"
#include "backend/kernel_compiler/cpu/cpu_kernel_factory.h" #include "backend/kernel_compiler/cpu/cpu_kernel_factory.h"
#include "backend/kernel_compiler/cpu/ps/pserver_kernel.h" #include "backend/kernel_compiler/cpu/ps/pserver_kernel.h"
#include "backend/kernel_compiler/cpu/ps/sparse_apply_adam_ps_kernel.h" #include "backend/kernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.h"
#include "backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.h" #include "backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.h"
#include "backend/kernel_compiler/cpu/ps/apply_momentum_ps_kernel.h" #include "backend/kernel_compiler/cpu/ps/apply_momentum_ps_kernel.h"
#include "backend/kernel_compiler/cpu/ps/embedding_look_up_ps_kernel.h" #include "backend/kernel_compiler/cpu/ps/embedding_look_up_ps_kernel.h"
@ -90,6 +90,7 @@ class ParameterServer {
void HandleInitInputsShape(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res); void HandleInitInputsShape(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res);
void HandleInitEmbeddings(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res); void HandleInitEmbeddings(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res);
void HandleEmbeddingLookup(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res); void HandleEmbeddingLookup(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res);
void HandleFinalize(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res);
ParameterServer *ps_; ParameterServer *ps_;
typedef void (ServerHandler::*RequestHandler)(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, typedef void (ServerHandler::*RequestHandler)(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data,
@ -165,6 +166,7 @@ void ParameterServer<T>::ServerHandler::Init() {
handlers_[kInitOptimInputsShapeCmd] = &ServerHandler::HandleInitInputsShape; handlers_[kInitOptimInputsShapeCmd] = &ServerHandler::HandleInitInputsShape;
handlers_[kInitEmbeddingsCmd] = &ServerHandler::HandleInitEmbeddings; handlers_[kInitEmbeddingsCmd] = &ServerHandler::HandleInitEmbeddings;
handlers_[kEmbeddingLookupCmd] = &ServerHandler::HandleEmbeddingLookup; handlers_[kEmbeddingLookupCmd] = &ServerHandler::HandleEmbeddingLookup;
handlers_[kFinalizeCmd] = &ServerHandler::HandleFinalize;
} }
template <typename T> template <typename T>
@ -256,16 +258,16 @@ void ParameterServer<T>::ServerHandler::HandleEmbeddingLookup(const ::ps::KVMeta
ps_->DoEmbeddingLookup(key, req_data.keys.segment(1, req_data.keys.size()), res); ps_->DoEmbeddingLookup(key, req_data.keys.segment(1, req_data.keys.size()), res);
} }
template <typename T>
void ParameterServer<T>::ServerHandler::HandleFinalize(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data,
::ps::KVPairs<T> *res) {
::ps::Finalize(0, false);
}
template <typename T> template <typename T>
bool ParameterServer<T>::Init(const FuncGraphPtr &func_graph) { bool ParameterServer<T>::Init(const FuncGraphPtr &func_graph) {
const char *server_num = getenv(kEnvPServerNum); pserver_num_ = ::ps::NumServers();
const char *worker_num = getenv(kEnvWorkerNum); worker_num_ = ::ps::NumWorkers();
if (server_num != nullptr) {
pserver_num_ = *server_num - '0';
}
if (worker_num != nullptr) {
worker_num_ = *worker_num - '0';
}
func_graph_ = func_graph; func_graph_ = func_graph;
rank_id_ = ::ps::MyRank(); rank_id_ = ::ps::MyRank();
handler_.reset(new ServerHandler(this)); handler_.reset(new ServerHandler(this));
@ -319,7 +321,7 @@ void ParameterServer<T>::InitOptimInputsShape(const Keys &keys, const Values &va
if (optimizers_.count(key) == 0 && optim_inputs_shape_.count(key) > 0) { if (optimizers_.count(key) == 0 && optim_inputs_shape_.count(key) > 0) {
if (optim_name == kSparseAdam) { if (optim_name == kSparseAdam) {
std::shared_ptr<PServerKernel> optimizer = std::shared_ptr<PServerKernel> optimizer =
std::make_shared<kernel::ps::SparseApplyAdamPSKernel>(rank_id_, pserver_num_); std::make_shared<kernel::ps::SparseApplyLazyAdamPSKernel>(rank_id_, pserver_num_);
optimizer->InitKernel(optim_inputs_shape_[key]); optimizer->InitKernel(optim_inputs_shape_[key]);
optimizers_[key] = optimizer; optimizers_[key] = optimizer;
} else if (optim_name == kApplyMomentum) { } else if (optim_name == kApplyMomentum) {
@ -368,10 +370,11 @@ void ParameterServer<T>::InitEmbeddingTable(
} }
WeightPtr embedding = std::make_shared<Weight>(total_dims, 0); WeightPtr embedding = std::make_shared<Weight>(total_dims, 0);
T *embedding_data = embedding->data();
std::default_random_engine engine; std::default_random_engine engine;
std::normal_distribution<float> random(0, 0.01); std::normal_distribution<float> random(0, 0.01);
for (size_t i = 0; i < total_dims; i++) { for (size_t i = 0; i < total_dims; i++) {
(*embedding)[i] = random(engine); embedding_data[i] = random(engine);
} }
weights_[key] = embedding; weights_[key] = embedding;
@ -402,6 +405,7 @@ void ParameterServer<T>::UpdateWeights() {
const std::vector<kernel::AddressPtr> &workspaces = optim_info->workspaces(); const std::vector<kernel::AddressPtr> &workspaces = optim_info->workspaces();
const std::vector<kernel::AddressPtr> &outputs = optim_info->outputs(); const std::vector<kernel::AddressPtr> &outputs = optim_info->outputs();
optim_info->ComputeMean(worker_num_);
optimizer->Execute(inputs, workspaces, outputs); optimizer->Execute(inputs, workspaces, outputs);
optim_info->Reset(); optim_info->Reset();
} }

View File

@ -50,6 +50,7 @@ class Worker {
void InitPSParamAndOptim(const std::string &param_name, void *param_data, size_t param_size); void InitPSParamAndOptim(const std::string &param_name, void *param_data, size_t param_size);
void DoPSEmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray<int> &lookup_ids, void DoPSEmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray<int> &lookup_ids,
const ::ps::SArray<int> &lens, ::ps::SArray<T> *lookup_result, int cmd); const ::ps::SArray<int> &lens, ::ps::SArray<T> *lookup_result, int cmd);
void Finalize();
private: private:
Worker() : kv_worker_(nullptr), running_(false), key_cnt_(0) {} Worker() : kv_worker_(nullptr), running_(false), key_cnt_(0) {}
@ -118,6 +119,11 @@ void Worker<T>::DoPSEmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const :
kv_worker_->EmbeddingLookup(keys, lookup_ids, lens, lookup_result, cmd); kv_worker_->EmbeddingLookup(keys, lookup_ids, lens, lookup_result, cmd);
} }
template <typename T>
void Worker<T>::Finalize() {
kv_worker_->Finalize();
}
template <typename T> template <typename T>
void Worker<T>::InitPSParamData(const std::vector<size_t> &keys, void *origin_addr, size_t size) { void Worker<T>::InitPSParamData(const std::vector<size_t> &keys, void *origin_addr, size_t size) {
::ps::SArray<T> addr(reinterpret_cast<T *>(origin_addr), size / sizeof(T)); ::ps::SArray<T> addr(reinterpret_cast<T *>(origin_addr), size / sizeof(T));

View File

@ -58,6 +58,7 @@ class WorkerProxy : public ::ps::KVWorker<T> {
const ::ps::SArray<int> &lens = {}, const Callback &cb = nullptr, int priority = 0); const ::ps::SArray<int> &lens = {}, const Callback &cb = nullptr, int priority = 0);
void PushData(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray<T> &vals, const ::ps::SArray<int> &lens = {}, void PushData(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray<T> &vals, const ::ps::SArray<int> &lens = {},
int cmd = 0, int priority = 0); int cmd = 0, int priority = 0);
void Finalize();
private: private:
template <typename C> template <typename C>
@ -146,6 +147,17 @@ void WorkerProxy<T>::PushData(const ::ps::SArray<::ps::Key> &keys, const ::ps::S
obj_->WaitRequest(ts); obj_->WaitRequest(ts);
} }
template <typename T>
void WorkerProxy<T>::Finalize() {
int ts = obj_->NewRequest(::ps::kServerGroup);
::ps::KVPairs<T> kvs;
kvs.keys.push_back(0);
kvs.vals.push_back(0.0f);
Send(obj_, ts, true, false, kFinalizeCmd, kvs, broadcast_slicer_);
obj_->WaitRequest(ts);
::ps::Finalize(0, false);
}
template <typename T> template <typename T>
template <typename C> template <typename C>
int WorkerProxy<T>::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray<int> &lookup_ids, int WorkerProxy<T>::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray<int> &lookup_ids,

View File

@ -75,7 +75,7 @@ def train(net, data, label):
print(res) print(res)
print("+++++++++++++++++++++++++++") print("+++++++++++++++++++++++++++")
diff = res.asnumpy()[0] - 2.3025851 diff = res.asnumpy()[0] - 2.3025851
assert np.all(diff < 1.e-7) assert np.all(diff < 1.e-6)
@pytest.mark.level0 @pytest.mark.level0