From e0da486e2fff7194e9f361d826d05392e1995a50 Mon Sep 17 00:00:00 2001 From: ZPaC Date: Thu, 9 Jul 2020 22:01:28 +0800 Subject: [PATCH] Add push pull kernels --- mindspore/ccsrc/kernel/CMakeLists.txt | 4 +- mindspore/ccsrc/kernel/cpu/ps/pull_kernel.cc | 25 ++++++ mindspore/ccsrc/kernel/cpu/ps/pull_kernel.h | 85 ++++++++++++++++++++ mindspore/ccsrc/kernel/cpu/ps/push_kernel.cc | 38 +++++++++ mindspore/ccsrc/kernel/cpu/ps/push_kernel.h | 80 ++++++++++++++++++ 5 files changed, 231 insertions(+), 1 deletion(-) create mode 100644 mindspore/ccsrc/kernel/cpu/ps/pull_kernel.cc create mode 100644 mindspore/ccsrc/kernel/cpu/ps/pull_kernel.h create mode 100644 mindspore/ccsrc/kernel/cpu/ps/push_kernel.cc create mode 100644 mindspore/ccsrc/kernel/cpu/ps/push_kernel.h diff --git a/mindspore/ccsrc/kernel/CMakeLists.txt b/mindspore/ccsrc/kernel/CMakeLists.txt index ceea6b1a990..362e0c06195 100644 --- a/mindspore/ccsrc/kernel/CMakeLists.txt +++ b/mindspore/ccsrc/kernel/CMakeLists.txt @@ -25,7 +25,9 @@ if (ENABLE_CPU) file(GLOB_RECURSE CPU_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "cpu/*.cc" ) - + + list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/push_kernel.cc" "cpu/ps/pull_kernel.cc") + if (NOT ENABLE_MPI) list(REMOVE_ITEM CPU_SRC_LIST "cpu/allgather_cpu_kernel.cc") list(REMOVE_ITEM CPU_SRC_LIST "cpu/reduce_scatter_cpu_kernel.cc") diff --git a/mindspore/ccsrc/kernel/cpu/ps/pull_kernel.cc b/mindspore/ccsrc/kernel/cpu/ps/pull_kernel.cc new file mode 100644 index 00000000000..90b5e2e64d5 --- /dev/null +++ b/mindspore/ccsrc/kernel/cpu/ps/pull_kernel.cc @@ -0,0 +1,25 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "kernel/cpu/ps/pull_kernel.h" + +namespace mindspore { +namespace kernel { +MS_REG_CPU_KERNEL_T( + Pull, KernelAttr().AddInputAttr(kNumberTypeUInt64).AddInputAttr(kNumberTypeFloat32).AddOutputAttr(kNumberTypeFloat32), + PullKernel, float); +} // namespace kernel +} // namespace mindspore diff --git a/mindspore/ccsrc/kernel/cpu/ps/pull_kernel.h b/mindspore/ccsrc/kernel/cpu/ps/pull_kernel.h new file mode 100644 index 00000000000..5cde0056171 --- /dev/null +++ b/mindspore/ccsrc/kernel/cpu/ps/pull_kernel.h @@ -0,0 +1,85 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_KERNEL_PS_PULL_KERNEL_H_ +#define MINDSPORE_CCSRC_KERNEL_PS_PULL_KERNEL_H_ + +#include +#include +#include "parallel/ps/worker.h" +#include "parallel/ps/util.h" +#include "kernel/cpu/cpu_kernel.h" +#include "kernel/cpu/cpu_kernel_factory.h" + +namespace mindspore { +namespace kernel { +template +class PullKernel : public CPUKernel { + public: + PullKernel() : keys_size_(sizeof(size_t)), var_size_(sizeof(size_t)) {} + ~PullKernel() override = default; + + bool Launch(const std::vector &inputs, const std::vector &, const std::vector &) { + // If the paramter is embedding table, don't Pull from PServer. + if (param_name_.find("embedding") == std::string::npos && param_name_.find("wide_w") == std::string::npos) { + parallel::ps::Worker::GetInstance().Pull(key_, inputs[1]->addr, inputs[1]->size); + } + return true; + } + void Init(const CNodePtr &kernel_node) { + size_t input_num = AnfAlgo::GetInputTensorNum(kernel_node); + if (input_num != 2) { + MS_LOG(ERROR) << "Input number is " << input_num << ", but pull needs 2 inputs."; + return; + } + + auto key_shape = AnfAlgo::GetPrevNodeOutputInferShape(kernel_node, 0); + for (size_t i = 0; i < key_shape.size(); i++) { + keys_size_ *= key_shape[i]; + } + auto var_shape = AnfAlgo::GetPrevNodeOutputInferShape(kernel_node, 1); + for (size_t i = 0; i < var_shape.size(); i++) { + var_size_ *= var_shape[i]; + } + auto param_node = AnfAlgo::GetInputNode(kernel_node, 1); + MS_EXCEPTION_IF_NULL(param_node); + param_name_ = param_node->fullname_with_scope(); + + if (mindspore::parallel::ps::Util::IsRoleOfWorker()) { + key_ = AnfAlgo::GetNodeAttr(kernel_node, kAttrPsKey); + } + InitSizeLists(); + return; + } + void InitKernel(const CNodePtr &kernel_node) { return; } + + protected: + void InitSizeLists() { + input_size_list_.push_back(keys_size_); + input_size_list_.push_back(var_size_); + output_size_list_.push_back(0); + } + + private: + size_t key_; + size_t keys_size_; + size_t var_size_; + std::string param_name_; +}; +} // namespace kernel +} // namespace mindspore + +#endif // MINDSPORE_CCSRC_KERNEL_PS_PULL_KERNEL_H_ diff --git a/mindspore/ccsrc/kernel/cpu/ps/push_kernel.cc b/mindspore/ccsrc/kernel/cpu/ps/push_kernel.cc new file mode 100644 index 00000000000..a49c7e92071 --- /dev/null +++ b/mindspore/ccsrc/kernel/cpu/ps/push_kernel.cc @@ -0,0 +1,38 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "kernel/cpu/ps/push_kernel.h" + +namespace mindspore { +namespace kernel { +MS_REG_CPU_KERNEL_T(Push, + KernelAttr() + .AddInputAttr(kNumberTypeFloat32) + .AddInputAttr(kNumberTypeFloat32) + .AddInputAttr(kNumberTypeFloat32) + .AddInputAttr(kNumberTypeFloat32) + .AddInputAttr(kNumberTypeFloat32) + .AddInputAttr(kNumberTypeFloat32) + .AddInputAttr(kNumberTypeFloat32) + .AddInputAttr(kNumberTypeInt32) + .AddOutputAttr(kNumberTypeUInt64), + PushKernel, float); + +MS_REG_CPU_KERNEL_T( + Push, KernelAttr().AddInputAttr(kNumberTypeFloat32).AddInputAttr(kNumberTypeInt32).AddOutputAttr(kNumberTypeUInt64), + PushKernel, float); +} // namespace kernel +} // namespace mindspore diff --git a/mindspore/ccsrc/kernel/cpu/ps/push_kernel.h b/mindspore/ccsrc/kernel/cpu/ps/push_kernel.h new file mode 100644 index 00000000000..436bebd388e --- /dev/null +++ b/mindspore/ccsrc/kernel/cpu/ps/push_kernel.h @@ -0,0 +1,80 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_KERNEL_PS_PUSH_KERNEL_H_ +#define MINDSPORE_CCSRC_KERNEL_PS_PUSH_KERNEL_H_ + +#include +#include +#include "parallel/ps/worker.h" +#include "parallel/ps/util.h" +#include "kernel/cpu/cpu_kernel.h" +#include "kernel/cpu/cpu_kernel_factory.h" + +namespace mindspore { +namespace kernel { +template +class PushKernel : public CPUKernel { + public: + PushKernel() : key_(UINT64_MAX) {} + ~PushKernel() override = default; + + bool Launch(const std::vector &inputs, const std::vector &, + const std::vector &outputs) { + std::vector keys; + std::vector addrs; + std::vector sizes; + for (auto input : inputs) { + keys.push_back(key_); + addrs.push_back(reinterpret_cast(input->addr)); + sizes.push_back(SizeToInt(input->size) / sizeof(T)); + } + parallel::ps::Worker::GetInstance().Push(keys, addrs, sizes); + memcpy(outputs[0]->addr, &key_, sizeof(size_t)); + return true; + } + + void Init(const CNodePtr &kernel_node) { + key_ = AnfAlgo::GetNodeAttr(kernel_node, kAttrPsKey); + auto optim_input_shapes = AnfAlgo::GetNodeAttr>>(kernel_node, "optim_input_shapes"); + std::vector only_shape_indices = AnfAlgo::GetNodeAttr>(kernel_node, "only_shape_indices"); + MS_LOG(INFO) << "Key " << key_ << " optimizer input shapes are:" << optim_input_shapes; + MS_LOG(INFO) << "Only init shape indices are " << only_shape_indices; + for (size_t i = 0; i < optim_input_shapes.size(); i++) { + auto shape = optim_input_shapes[i]; + mindspore::parallel::ps::Worker::GetInstance().SetOptimInputShapes(key_, shape); + if (std::count(only_shape_indices.begin(), only_shape_indices.end(), i) == 0) { + size_t size = sizeof(T); + for (size_t j = 0; j < shape.size(); j++) { + size *= shape[j]; + } + input_size_list_.push_back(size); + } + } + + output_size_list_.push_back(sizeof(size_t)); + return; + } + + void InitKernel(const CNodePtr &kernel_node) { return; } + + private: + size_t key_; +}; +} // namespace kernel +} // namespace mindspore + +#endif // MINDSPORE_CCSRC_KERNEL_PS_PUSH_KERNEL_H_