forked from mindspore-Ecosystem/mindspore
!2979 Add push pull kernel
Merge pull request !2979 from ZPaC/add-push-pull-kernel
This commit is contained in:
commit
f201bd655c
|
@ -25,7 +25,9 @@ if (ENABLE_CPU)
|
|||
file(GLOB_RECURSE CPU_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR}
|
||||
"cpu/*.cc"
|
||||
)
|
||||
|
||||
|
||||
list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/push_kernel.cc" "cpu/ps/pull_kernel.cc")
|
||||
|
||||
if (NOT ENABLE_MPI)
|
||||
list(REMOVE_ITEM CPU_SRC_LIST "cpu/allgather_cpu_kernel.cc")
|
||||
list(REMOVE_ITEM CPU_SRC_LIST "cpu/reduce_scatter_cpu_kernel.cc")
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
/**
|
||||
* Copyright 2020 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "kernel/cpu/ps/pull_kernel.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace kernel {
|
||||
MS_REG_CPU_KERNEL_T(
|
||||
Pull, KernelAttr().AddInputAttr(kNumberTypeUInt64).AddInputAttr(kNumberTypeFloat32).AddOutputAttr(kNumberTypeFloat32),
|
||||
PullKernel, float);
|
||||
} // namespace kernel
|
||||
} // namespace mindspore
|
|
@ -0,0 +1,85 @@
|
|||
/**
|
||||
* Copyright 2020 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef MINDSPORE_CCSRC_KERNEL_PS_PULL_KERNEL_H_
|
||||
#define MINDSPORE_CCSRC_KERNEL_PS_PULL_KERNEL_H_
|
||||
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include "parallel/ps/worker.h"
|
||||
#include "parallel/ps/util.h"
|
||||
#include "kernel/cpu/cpu_kernel.h"
|
||||
#include "kernel/cpu/cpu_kernel_factory.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace kernel {
|
||||
template <typename T>
|
||||
class PullKernel : public CPUKernel {
|
||||
public:
|
||||
PullKernel() : keys_size_(sizeof(size_t)), var_size_(sizeof(size_t)) {}
|
||||
~PullKernel() override = default;
|
||||
|
||||
bool Launch(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &, const std::vector<AddressPtr> &) {
|
||||
// If the paramter is embedding table, don't Pull from PServer.
|
||||
if (param_name_.find("embedding") == std::string::npos && param_name_.find("wide_w") == std::string::npos) {
|
||||
parallel::ps::Worker<T>::GetInstance().Pull(key_, inputs[1]->addr, inputs[1]->size);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
void Init(const CNodePtr &kernel_node) {
|
||||
size_t input_num = AnfAlgo::GetInputTensorNum(kernel_node);
|
||||
if (input_num != 2) {
|
||||
MS_LOG(ERROR) << "Input number is " << input_num << ", but pull needs 2 inputs.";
|
||||
return;
|
||||
}
|
||||
|
||||
auto key_shape = AnfAlgo::GetPrevNodeOutputInferShape(kernel_node, 0);
|
||||
for (size_t i = 0; i < key_shape.size(); i++) {
|
||||
keys_size_ *= key_shape[i];
|
||||
}
|
||||
auto var_shape = AnfAlgo::GetPrevNodeOutputInferShape(kernel_node, 1);
|
||||
for (size_t i = 0; i < var_shape.size(); i++) {
|
||||
var_size_ *= var_shape[i];
|
||||
}
|
||||
auto param_node = AnfAlgo::GetInputNode(kernel_node, 1);
|
||||
MS_EXCEPTION_IF_NULL(param_node);
|
||||
param_name_ = param_node->fullname_with_scope();
|
||||
|
||||
if (mindspore::parallel::ps::Util::IsRoleOfWorker()) {
|
||||
key_ = AnfAlgo::GetNodeAttr<size_t>(kernel_node, kAttrPsKey);
|
||||
}
|
||||
InitSizeLists();
|
||||
return;
|
||||
}
|
||||
void InitKernel(const CNodePtr &kernel_node) { return; }
|
||||
|
||||
protected:
|
||||
void InitSizeLists() {
|
||||
input_size_list_.push_back(keys_size_);
|
||||
input_size_list_.push_back(var_size_);
|
||||
output_size_list_.push_back(0);
|
||||
}
|
||||
|
||||
private:
|
||||
size_t key_;
|
||||
size_t keys_size_;
|
||||
size_t var_size_;
|
||||
std::string param_name_;
|
||||
};
|
||||
} // namespace kernel
|
||||
} // namespace mindspore
|
||||
|
||||
#endif // MINDSPORE_CCSRC_KERNEL_PS_PULL_KERNEL_H_
|
|
@ -0,0 +1,38 @@
|
|||
/**
|
||||
* Copyright 2020 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "kernel/cpu/ps/push_kernel.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace kernel {
|
||||
MS_REG_CPU_KERNEL_T(Push,
|
||||
KernelAttr()
|
||||
.AddInputAttr(kNumberTypeFloat32)
|
||||
.AddInputAttr(kNumberTypeFloat32)
|
||||
.AddInputAttr(kNumberTypeFloat32)
|
||||
.AddInputAttr(kNumberTypeFloat32)
|
||||
.AddInputAttr(kNumberTypeFloat32)
|
||||
.AddInputAttr(kNumberTypeFloat32)
|
||||
.AddInputAttr(kNumberTypeFloat32)
|
||||
.AddInputAttr(kNumberTypeInt32)
|
||||
.AddOutputAttr(kNumberTypeUInt64),
|
||||
PushKernel, float);
|
||||
|
||||
MS_REG_CPU_KERNEL_T(
|
||||
Push, KernelAttr().AddInputAttr(kNumberTypeFloat32).AddInputAttr(kNumberTypeInt32).AddOutputAttr(kNumberTypeUInt64),
|
||||
PushKernel, float);
|
||||
} // namespace kernel
|
||||
} // namespace mindspore
|
|
@ -0,0 +1,80 @@
|
|||
/**
|
||||
* Copyright 2020 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef MINDSPORE_CCSRC_KERNEL_PS_PUSH_KERNEL_H_
|
||||
#define MINDSPORE_CCSRC_KERNEL_PS_PUSH_KERNEL_H_
|
||||
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
#include "parallel/ps/worker.h"
|
||||
#include "parallel/ps/util.h"
|
||||
#include "kernel/cpu/cpu_kernel.h"
|
||||
#include "kernel/cpu/cpu_kernel_factory.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace kernel {
|
||||
template <typename T>
|
||||
class PushKernel : public CPUKernel {
|
||||
public:
|
||||
PushKernel() : key_(UINT64_MAX) {}
|
||||
~PushKernel() override = default;
|
||||
|
||||
bool Launch(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &,
|
||||
const std::vector<AddressPtr> &outputs) {
|
||||
std::vector<size_t> keys;
|
||||
std::vector<uintptr_t> addrs;
|
||||
std::vector<int> sizes;
|
||||
for (auto input : inputs) {
|
||||
keys.push_back(key_);
|
||||
addrs.push_back(reinterpret_cast<uintptr_t>(input->addr));
|
||||
sizes.push_back(SizeToInt(input->size) / sizeof(T));
|
||||
}
|
||||
parallel::ps::Worker<T>::GetInstance().Push(keys, addrs, sizes);
|
||||
memcpy(outputs[0]->addr, &key_, sizeof(size_t));
|
||||
return true;
|
||||
}
|
||||
|
||||
void Init(const CNodePtr &kernel_node) {
|
||||
key_ = AnfAlgo::GetNodeAttr<size_t>(kernel_node, kAttrPsKey);
|
||||
auto optim_input_shapes = AnfAlgo::GetNodeAttr<std::vector<std::vector<int>>>(kernel_node, "optim_input_shapes");
|
||||
std::vector<int> only_shape_indices = AnfAlgo::GetNodeAttr<std::vector<int>>(kernel_node, "only_shape_indices");
|
||||
MS_LOG(INFO) << "Key " << key_ << " optimizer input shapes are:" << optim_input_shapes;
|
||||
MS_LOG(INFO) << "Only init shape indices are " << only_shape_indices;
|
||||
for (size_t i = 0; i < optim_input_shapes.size(); i++) {
|
||||
auto shape = optim_input_shapes[i];
|
||||
mindspore::parallel::ps::Worker<float>::GetInstance().SetOptimInputShapes(key_, shape);
|
||||
if (std::count(only_shape_indices.begin(), only_shape_indices.end(), i) == 0) {
|
||||
size_t size = sizeof(T);
|
||||
for (size_t j = 0; j < shape.size(); j++) {
|
||||
size *= shape[j];
|
||||
}
|
||||
input_size_list_.push_back(size);
|
||||
}
|
||||
}
|
||||
|
||||
output_size_list_.push_back(sizeof(size_t));
|
||||
return;
|
||||
}
|
||||
|
||||
void InitKernel(const CNodePtr &kernel_node) { return; }
|
||||
|
||||
private:
|
||||
size_t key_;
|
||||
};
|
||||
} // namespace kernel
|
||||
} // namespace mindspore
|
||||
|
||||
#endif // MINDSPORE_CCSRC_KERNEL_PS_PUSH_KERNEL_H_
|
Loading…
Reference in New Issue