From 8273c2a39cf50abea6bddc9494f3040b52a5f736 Mon Sep 17 00:00:00 2001 From: ZPaC Date: Sat, 11 Jul 2020 15:06:42 +0800 Subject: [PATCH] Add ps optimizer info builder. --- mindspore/ccsrc/parallel/CMakeLists.txt | 2 +- mindspore/ccsrc/parallel/ps/common.h | 87 +++++++++ .../parallel/ps/optimizer_info_builder.cc | 184 ++++++++++++++++++ .../parallel/ps/optimizer_info_builder.h | 66 +++++++ tests/ut/cpp/CMakeLists.txt | 1 + 5 files changed, 339 insertions(+), 1 deletion(-) create mode 100644 mindspore/ccsrc/parallel/ps/common.h create mode 100644 mindspore/ccsrc/parallel/ps/optimizer_info_builder.cc create mode 100644 mindspore/ccsrc/parallel/ps/optimizer_info_builder.h diff --git a/mindspore/ccsrc/parallel/CMakeLists.txt b/mindspore/ccsrc/parallel/CMakeLists.txt index 9b1c732f58b..76ac2cfcd76 100644 --- a/mindspore/ccsrc/parallel/CMakeLists.txt +++ b/mindspore/ccsrc/parallel/CMakeLists.txt @@ -1,5 +1,5 @@ file(GLOB_RECURSE _PARALLEL_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc") -list(REMOVE_ITEM _PARALLEL_SRC_FILES "ps/util.cc" "ps/scheduler.cc" "ps/optimizer_info.cc") +list(REMOVE_ITEM _PARALLEL_SRC_FILES "ps/util.cc" "ps/scheduler.cc" "ps/optimizer_info.cc" "ps/optimizer_info_builder.cc") if (ENABLE_DUMP_PROTO) list(REMOVE_ITEM _PARALLEL_SRC_FILES "parallel/strategy_checkpoint/parallel_strategy_checkpoint.cc") endif () diff --git a/mindspore/ccsrc/parallel/ps/common.h b/mindspore/ccsrc/parallel/ps/common.h new file mode 100644 index 00000000000..5e136c816f2 --- /dev/null +++ b/mindspore/ccsrc/parallel/ps/common.h @@ -0,0 +1,87 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_MINDSPORE_CCSRC_PARALLEL_PS_COMMON_H_ +#define MINDSPORE_MINDSPORE_CCSRC_PARALLEL_PS_COMMON_H_ + +#include +#include +#include +#include "ps/ps.h" + +namespace mindspore { +namespace parallel { +namespace ps { +constexpr char kEnvCommType[] = "MS_COMM_TYPE"; +constexpr char kEnvInterface[] = "MS_INTERFACE"; +constexpr char kEnvPServerNum[] = "MS_SERVER_NUM"; +constexpr char kEnvWorkerNum[] = "MS_WORKER_NUM"; +constexpr char kEnvSchedulerHost[] = "MS_SCHED_HOST"; +constexpr char kEnvSchedulerPort[] = "MS_SCHED_PORT"; + +constexpr char kEnvRole[] = "MS_ROLE"; +constexpr char kEnvRoleOfPServer[] = "MS_PSERVER"; +constexpr char kEnvRoleOfWorker[] = "MS_WORKER"; +constexpr char kEnvRoleOfScheduler[] = "MS_SCHED"; + +constexpr char kDmlcCommType[] = "DMLC_PS_VAN_TYPE"; +constexpr char kDmlcInterface[] = "DMLC_INTERFACE"; +constexpr char kDmlcPServerNum[] = "DMLC_NUM_SERVER"; +constexpr char kDmlcWorkerNum[] = "DMLC_NUM_WORKER"; +constexpr char kDmlcRole[] = "DMLC_ROLE"; +constexpr char kDmlcSchedulerHost[] = "DMLC_PS_ROOT_URI"; +constexpr char kDmlcSchedulerPort[] = "DMLC_PS_ROOT_PORT"; + +constexpr char kCommTypeOfIBVerbs[] = "ibverbs"; +constexpr char kCommTypeOfTCP[] = "zmq"; +constexpr char kRoleOfPServer[] = "server"; +constexpr char kRoleOfWorker[] = "worker"; +constexpr char kRoleOfScheduler[] = "scheduler"; + +constexpr char kLearningRate[] = "learning_rate"; +constexpr char kMomentum[] = "momentum"; + +constexpr char kApplyMomentum[] = "ApplyMomentum"; +constexpr char kSparseAdam[] = "Adam"; +constexpr char kSparseFtrl[] = "Ftrl"; + +constexpr int kInitWeightsCmd = 10; +constexpr int kInitWeightToOptimIdCmd = 11; +constexpr int kInitOptimInputsShapeCmd = 12; +constexpr int kInitEmbeddingsCmd = 20; +constexpr int kEmbeddingLookupCmd = 30; + +constexpr size_t kInvalidKey = UINT64_MAX; + +using Key = ::ps::Key; +using Keys = ::ps::SArray; +using Values = ::ps::SArray; +using ValuesPtr = std::shared_ptr; +using Weight = ::ps::SArray; +using Grad = ::ps::SArray; +using LookupIds = ::ps::SArray; +using Lengths = ::ps::SArray; +using WeightPtr = std::shared_ptr; +using GradPtr = std::shared_ptr; +// using EmbeddingTable = std::unordered_map; +// using EmbeddingTable = ::ps::SArray; +// using EmbeddingTablePtr = std::shared_ptr; +using InputsShape = std::vector>>; +using InputsShapePtr = std::shared_ptr>>>; +} // namespace ps +} // namespace parallel +} // namespace mindspore +#endif // MINDSPORE_MINDSPORE_CCSRC_PARALLEL_PS_COMMON_H_ diff --git a/mindspore/ccsrc/parallel/ps/optimizer_info_builder.cc b/mindspore/ccsrc/parallel/ps/optimizer_info_builder.cc new file mode 100644 index 00000000000..02c99c49592 --- /dev/null +++ b/mindspore/ccsrc/parallel/ps/optimizer_info_builder.cc @@ -0,0 +1,184 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "parallel/ps/optimizer_info_builder.h" +#include +#include +#include + +namespace mindspore { +namespace parallel { +namespace ps { +OptimizerInfo *OptimizerInfoBuilder::Build(const std::shared_ptr &pserver_kernel, + const WeightPtr &weight, const Keys &keys, const Values &values, + const Lengths &lens, const InputsShapePtr &inputs_shape, size_t worker_num) { + OptimizerInfo *optim_info = BuildInputs(weight, keys, values, lens, inputs_shape, worker_num); + std::vector ws_sizes = pserver_kernel->workspace_sizes(); + BuildWorkspaces(optim_info, ws_sizes, worker_num); + BuildOutputs(optim_info, worker_num); + return optim_info; +} + +void OptimizerInfoBuilder::BuildWorkspaces(OptimizerInfo *info, const std::vector &ws_sizes, + size_t worker_num) { + for (size_t i = 0; i < ws_sizes.size(); i++) { + size_t size = ws_sizes[i]; + AddressPtr workspace = std::make_shared(); + workspace->addr = new float[size]; + workspace->size = size; + info->AddWorkspace(workspace); + } +} + +OptimizerInfo *MomentumOptimInfoBuilder::BuildInputs(const WeightPtr &weight, const Keys &keys, const Values &values, + const Lengths &lens, const InputsShapePtr &inputs_shape, + size_t worker_num) { + AddressPtr weight_addr = std::make_shared(); + weight_addr->addr = weight->data(); + weight_addr->size = weight->size(); + void *data_ptr = values.data(); + AddressPtr accumulate = std::make_shared(); + accumulate->addr = new float[weight->size()]; + accumulate->size = weight->size(); + AddressPtr learning_rate = std::make_shared(); + learning_rate->addr = data_ptr; + learning_rate->size = lens[0]; + AddressPtr gradient = std::make_shared(); + gradient->addr = reinterpret_cast(learning_rate->addr) + lens[0]; + gradient->size = lens[1]; + AddressPtr momentum = std::make_shared(); + momentum->addr = reinterpret_cast(gradient->addr) + lens[1]; + momentum->size = lens[2]; + + return new MomentumOptimInfo(weight_addr, accumulate, learning_rate, gradient, momentum); +} + +OptimizerInfo *SparseAdamOptimInfoBuilder::BuildInputs(const WeightPtr &weight, const Keys &keys, const Values &values, + const Lengths &lens, const InputsShapePtr &inputs_shape, + size_t worker_num) { + AddressPtr weight_addr = std::make_shared(); + weight_addr->addr = weight->data(); + weight_addr->size = weight->size(); + AddressPtr m = std::make_shared(); + m->addr = new float[weight->size()]; + m->size = weight->size() * sizeof(float); + AddressPtr v = std::make_shared(); + v->addr = new float[weight->size()]; + v->size = weight->size() * sizeof(float); + + void *data_ptr = values.data(); + void *copy_data_ptr = new float[values.size()]; + auto ret = memcpy_s(copy_data_ptr, values.size() * sizeof(float), data_ptr, values.size() * sizeof(float)); + if (ret != 0) { + MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; + } + + AddressPtr beta1_power = std::make_shared(); + beta1_power->addr = copy_data_ptr; + beta1_power->size = lens[0] * sizeof(float); + AddressPtr beta2_power = std::make_shared(); + beta2_power->addr = reinterpret_cast(beta1_power->addr) + lens[0]; + beta2_power->size = lens[1] * sizeof(float); + + AddressPtr learning_rate = std::make_shared(); + learning_rate->addr = reinterpret_cast(beta2_power->addr) + lens[1]; + learning_rate->size = lens[2] * sizeof(float); + + AddressPtr beta1 = std::make_shared(); + beta1->addr = reinterpret_cast(learning_rate->addr) + lens[2]; + beta1->size = lens[3] * sizeof(float); + + AddressPtr beta2 = std::make_shared(); + beta2->addr = reinterpret_cast(beta1->addr) + lens[3]; + beta2->size = lens[4] * sizeof(float); + + AddressPtr epsilon = std::make_shared(); + epsilon->addr = reinterpret_cast(beta2->addr) + lens[4]; + epsilon->size = lens[5] * sizeof(float); + + const std::shared_ptr> &grad_shape = (*inputs_shape)[9]; + size_t total_grad_size = + std::accumulate((*grad_shape).begin(), (*grad_shape).end(), sizeof(float), std::multiplies()); + AddressPtr grad = std::make_shared(); + grad->addr = new float[total_grad_size * worker_num]; + auto ret2 = memcpy_s(grad->addr, lens[6] * sizeof(float), reinterpret_cast(epsilon->addr) + lens[5], + lens[6] * sizeof(float)); + if (ret2 != 0) { + MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret2 << ")"; + } + grad->size = lens[6] * sizeof(float); + + const std::shared_ptr> &indices_shape = (*inputs_shape)[10]; + size_t total_indice_size = + std::accumulate((*indices_shape).begin(), (*indices_shape).end(), sizeof(float), std::multiplies()); + AddressPtr indices = std::make_shared(); + indices->addr = new float[total_indice_size * worker_num]; + auto ret3 = memcpy_s(indices->addr, lens[7] * sizeof(float), + reinterpret_cast(epsilon->addr) + lens[5] + lens[6], lens[7] * sizeof(float)); + if (ret3 != 0) { + MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret3 << ")"; + } + indices->size = lens[7] * sizeof(float); + + return new SparseAdamOptimInfo(weight_addr, m, v, beta1_power, beta2_power, learning_rate, beta1, beta2, epsilon, + grad, indices, total_grad_size, total_indice_size); +} + +OptimizerInfo *SparseFtrlOptimInfoBuilder::BuildInputs(const WeightPtr &weight, const Keys &keys, const Values &values, + const Lengths &lens, const InputsShapePtr &inputs_shape, + size_t worker_num) { + AddressPtr weight_addr = std::make_shared(); + weight_addr->addr = weight->data(); + weight_addr->size = weight->size(); + AddressPtr accum = std::make_shared(); + accum->addr = new float[weight->size()]; + accum->size = weight->size() * sizeof(float); + for (size_t i = 0; i < weight->size(); i++) { + float *tmp = reinterpret_cast(accum->addr); + tmp[i] = 1.0; + } + AddressPtr linear = std::make_shared(); + linear->addr = new float[weight->size()]; + memcpy_s(linear->addr, weight->size() * sizeof(float), 0x00, weight->size() * sizeof(float)); + linear->size = weight->size() * sizeof(float); + + const std::shared_ptr> &grad_shape = (*inputs_shape)[3]; + size_t total_grad_size = std::accumulate((*grad_shape).begin(), (*grad_shape).end(), 1, std::multiplies()); + AddressPtr grad = std::make_shared(); + grad->addr = new float[total_grad_size * worker_num]; + auto ret = memcpy_s(grad->addr, lens[0] * sizeof(float), values.data(), lens[0] * sizeof(float)); + if (ret != 0) { + MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; + } + grad->size = lens[0] * sizeof(float); + + const std::shared_ptr> &indices_shape = (*inputs_shape)[4]; + size_t total_indice_size = + std::accumulate((*indices_shape).begin(), (*indices_shape).end(), 1, std::multiplies()); + AddressPtr indices = std::make_shared(); + indices->addr = new float[total_indice_size * worker_num]; + auto ret2 = memcpy_s(indices->addr, lens[1] * sizeof(float), reinterpret_cast(values.data()) + lens[0], + lens[1] * sizeof(float)); + if (ret2 != 0) { + MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret2 << ")"; + } + indices->size = lens[1] * sizeof(float); + + return new SparseFtrlOptimInfo(weight_addr, accum, linear, grad, indices, total_grad_size, total_indice_size); +} +} // namespace ps +} // namespace parallel +} // namespace mindspore diff --git a/mindspore/ccsrc/parallel/ps/optimizer_info_builder.h b/mindspore/ccsrc/parallel/ps/optimizer_info_builder.h new file mode 100644 index 00000000000..0703f5e755e --- /dev/null +++ b/mindspore/ccsrc/parallel/ps/optimizer_info_builder.h @@ -0,0 +1,66 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_MINDSPORE_CCSRC_PARALLEL_PS_OPTIMIZER_INFO_BUILDER_H_ + +#include +#include +#include "kernel/kernel.h" +#include "kernel/ps/pserver_kernel.h" +#include "parallel/ps/optimizer_info.h" + +namespace mindspore { +namespace parallel { +namespace ps { +using mindspore::kernel::KernelMod; +using mindspore::kernel::ps::PServerKernel; +class OptimizerInfoBuilder { + public: + OptimizerInfoBuilder() = default; + virtual ~OptimizerInfoBuilder() = default; + + OptimizerInfo *Build(const std::shared_ptr &pserver_kernel, const WeightPtr &weight, const Keys &keys, + const Values &values, const Lengths &lens, const InputsShapePtr &inputs_shape, + size_t worker_num); + + virtual OptimizerInfo *BuildInputs(const WeightPtr &weight, const Keys &keys, const Values &values, + const Lengths &lens, const InputsShapePtr &inputs_shape, size_t worker_num) = 0; + + virtual void BuildWorkspaces(OptimizerInfo *info, const std::vector &ws_sizes, size_t worker_num); + virtual void BuildOutputs(OptimizerInfo *info, size_t worker_num) {} +}; + +class MomentumOptimInfoBuilder : public OptimizerInfoBuilder { + public: + OptimizerInfo *BuildInputs(const WeightPtr &weight, const Keys &keys, const Values &values, const Lengths &lens, + const InputsShapePtr &inputs_shape, size_t worker_num) override; +}; + +class SparseAdamOptimInfoBuilder : public OptimizerInfoBuilder { + public: + OptimizerInfo *BuildInputs(const WeightPtr &weight, const Keys &keys, const Values &values, const Lengths &lens, + const InputsShapePtr &inputs_shpae, size_t worker_num) override; +}; + +class SparseFtrlOptimInfoBuilder : public OptimizerInfoBuilder { + public: + OptimizerInfo *BuildInputs(const WeightPtr &weight, const Keys &keys, const Values &values, const Lengths &lens, + const InputsShapePtr &inputs_shpae, size_t worker_num) override; +}; +} // namespace ps +} // namespace parallel +} // namespace mindspore +#endif // MINDSPORE_MINDSPORE_CCSRC_PARALLEL_PS_OPTIMIZER_INFO_BUILDER_H_ diff --git a/tests/ut/cpp/CMakeLists.txt b/tests/ut/cpp/CMakeLists.txt index f0a3ecb4467..4e9fdaca81d 100644 --- a/tests/ut/cpp/CMakeLists.txt +++ b/tests/ut/cpp/CMakeLists.txt @@ -118,6 +118,7 @@ list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/parallel/strategy_ list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/parallel/ps/util.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/parallel/ps/scheduler.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/parallel/ps/optimizer_info.cc") +list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/parallel/ps/optimizer_info_builder.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/utils/anf_ir.pb.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/utils/node_strategy.pb.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/utils/load_onnx/anf_model_parser.cc")