From 52022c80132b9bf3331fcbcda01ea0241c385d4a Mon Sep 17 00:00:00 2001 From: ZPaC Date: Mon, 13 Jul 2020 22:06:54 +0800 Subject: [PATCH] Enable to train in parameter server mode --- cmake/external_libs/glog.cmake | 2 +- cmake/options.cmake | 4 + .../backend/kernel_compiler/CMakeLists.txt | 19 +-- .../cpu/embedding_look_up_cpu_kernel.h | 2 +- .../cpu/ps/embedding_look_up_proxy_kernel.cc | 6 +- .../cpu/ps/embedding_look_up_ps_kernel.cc | 2 +- .../kernel_compiler/cpu/ps/push_kernel.cc | 8 ++ .../kernel_compiler/cpu/ps/push_kernel.h | 2 +- .../cpu/ps/sparse_apply_adam_ps_kernel.cc | 2 +- .../cpu/ps/sparse_apply_ftrl_ps_kernel.cc | 2 +- .../optimizer/pass/replace_node_by_proxy.cc | 1 - .../ccsrc/backend/session/ascend_session.cc | 10 ++ .../ccsrc/backend/session/cpu_session.cc | 29 ++++ mindspore/ccsrc/backend/session/cpu_session.h | 1 + .../ccsrc/backend/session/gpu_session.cc | 8 ++ .../ccsrc/backend/session/session_basic.cc | 92 ++++++++++++ .../ccsrc/backend/session/session_basic.h | 6 +- .../ccsrc/frontend/parallel/CMakeLists.txt | 9 +- .../frontend/parallel/ps/optimizer_info.cc | 15 +- .../frontend/parallel/ps/optimizer_info.h | 5 +- .../parallel/ps/optimizer_info_builder.cc | 25 ++-- .../parallel/ps/optimizer_info_builder.h | 2 +- .../frontend/parallel/ps/parameter_server.h | 39 +++-- mindspore/ccsrc/frontend/parallel/ps/worker.h | 11 +- .../ccsrc/frontend/parallel/ps/worker_proxy.h | 134 ++++++++---------- .../ccsrc/minddata/dataset/CMakeLists.txt | 6 + mindspore/ccsrc/pipeline/jit/action.cc | 46 +++++- mindspore/ccsrc/pipeline/jit/action.h | 5 + mindspore/ccsrc/pipeline/jit/pipeline.cc | 26 ++++ 29 files changed, 377 insertions(+), 142 deletions(-) diff --git a/cmake/external_libs/glog.cmake b/cmake/external_libs/glog.cmake index d7942a4efd7..f372c8e3c2f 100644 --- a/cmake/external_libs/glog.cmake +++ b/cmake/external_libs/glog.cmake @@ -1,4 +1,4 @@ -set(glog_CXXFLAGS "-D_FORTIFY_SOURCE=2 -O2 ${SECURE_CXX_FLAGS}") +set(glog_CXXFLAGS "-D_FORTIFY_SOURCE=2 -O2 ${SECURE_CXX_FLAGS} -D_GLIBCXX_USE_CXX11_ABI=0") set(glog_CFLAGS "-D_FORTIFY_SOURCE=2 -O2") mindspore_add_pkg(glog VER 0.4.0 diff --git a/cmake/options.cmake b/cmake/options.cmake index 2470c25a90c..84ac3f611db 100644 --- a/cmake/options.cmake +++ b/cmake/options.cmake @@ -123,3 +123,7 @@ endif() if(ENABLE_DEBUGGER) add_compile_definitions(ENABLE_DEBUGGER) endif() + +if(ENABLE_TESTCASES) + add_compile_definitions(ENABLE_TESTCASES) +endif() \ No newline at end of file diff --git a/mindspore/ccsrc/backend/kernel_compiler/CMakeLists.txt b/mindspore/ccsrc/backend/kernel_compiler/CMakeLists.txt index b412d83d116..3201cec92e6 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/CMakeLists.txt +++ b/mindspore/ccsrc/backend/kernel_compiler/CMakeLists.txt @@ -26,14 +26,6 @@ if (ENABLE_CPU) "cpu/*.cc" ) - list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/push_kernel.cc" - "cpu/ps/pull_kernel.cc" - "cpu/ps/embedding_look_up_ps_kernel.cc" - "cpu/ps/embedding_look_up_proxy_kernel.cc" - "cpu/ps/apply_momentum_ps_kernel.cc" - "cpu/ps/sparse_apply_adam_ps_kernel.cc" - "cpu/ps/sparse_apply_ftrl_ps_kernel.cc") - if (NOT ENABLE_MPI) list(REMOVE_ITEM CPU_SRC_LIST "cpu/allgather_cpu_kernel.cc") list(REMOVE_ITEM CPU_SRC_LIST "cpu/reduce_scatter_cpu_kernel.cc") @@ -41,6 +33,17 @@ if (ENABLE_CPU) endif () endif () +if (${CMAKE_SYSTEM_NAME} MATCHES "Windows" OR ENABLE_GE) + list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/apply_momentum_ps_kernel.cc") + list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/embedding_look_up_proxy_kernel.cc") + list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/embedding_look_up_ps_kernel.cc") + list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/pserver_kernel.cc") + list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/pull_kernel.cc") + list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/push_kernel.cc") + list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/sparse_apply_adam_ps_kernel.cc") + list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/sparse_apply_ftrl_ps_kernel.cc") +endif() + if (ENABLE_GPU) file(GLOB_RECURSE CUDA_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "gpu/*.cu" diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/embedding_look_up_cpu_kernel.h b/mindspore/ccsrc/backend/kernel_compiler/cpu/embedding_look_up_cpu_kernel.h index 6c61ee346c4..5cced70cdeb 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/embedding_look_up_cpu_kernel.h +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/embedding_look_up_cpu_kernel.h @@ -46,7 +46,7 @@ class EmbeddingLookUpCPUKernel : public CPUKernel { bool Launch(const std::vector &inputs, const std::vector &workspace, const std::vector &outputs) override; - private: + protected: void LookUpTable(const std::vector &inputs, size_t dim0, size_t dim1, size_t dim2, float **output_addr); void CheckParam(const CNodePtr &kernel_node); diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/embedding_look_up_proxy_kernel.cc b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/embedding_look_up_proxy_kernel.cc index 59ab65014be..2d986ff26a4 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/embedding_look_up_proxy_kernel.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/embedding_look_up_proxy_kernel.cc @@ -53,15 +53,15 @@ bool EmbeddingLookUpProxyKernel::Launch(const std::vector &i size_t output_size = outputs[0]->size; size_t size = input_size / sizeof(float); - ::ps::SArray lookup_ids(size, 0); + ::ps::SArray lookup_ids(size, 0); ::ps::SArray lengths{size}; - ::ps::SArray lookup_result; + ::ps::SArray lookup_result(output_size / sizeof(float), 0); auto ret = memcpy_s(lookup_ids.data(), input_size, indices_addr, input_size); if (ret != EOK) { MS_LOG(EXCEPTION) << "Lookup id memcpy failed."; } - parallel::ps::Worker::GetInstance().DoPSEmbeddingLookup({key_}, lookup_ids, lengths, lookup_result, + parallel::ps::Worker::GetInstance().DoPSEmbeddingLookup({key_}, lookup_ids, lengths, &lookup_result, parallel::ps::kEmbeddingLookupCmd); auto ret2 = memcpy_s(output_addr, output_size, lookup_result.data(), output_size); diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/embedding_look_up_ps_kernel.cc b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/embedding_look_up_ps_kernel.cc index bcb3ca8ae8a..13a84c34b8d 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/embedding_look_up_ps_kernel.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/embedding_look_up_ps_kernel.cc @@ -50,7 +50,7 @@ void EmbeddingLookUpPSKernel::InitKernel( split_num_ = pserver_num_; // input shape should be sharded after computing offset_; - Shard(input_shape_, axis_); + Shard(&input_shape_, axis_); size_t output_size = std::accumulate(output_shape_.begin(), output_shape_.end(), sizeof(float), std::multiplies()); diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/push_kernel.cc b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/push_kernel.cc index 96c1f15bda7..2322d4ee3a0 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/push_kernel.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/push_kernel.cc @@ -34,5 +34,13 @@ MS_REG_CPU_KERNEL_T(Push, MS_REG_CPU_KERNEL_T( Push, KernelAttr().AddInputAttr(kNumberTypeFloat32).AddInputAttr(kNumberTypeInt32).AddOutputAttr(kNumberTypeUInt64), PushKernel, float); + +MS_REG_CPU_KERNEL_T(Push, + KernelAttr() + .AddInputAttr(kNumberTypeFloat32) + .AddInputAttr(kNumberTypeFloat32) + .AddInputAttr(kNumberTypeFloat32) + .AddOutputAttr(kNumberTypeUInt64), + PushKernel, float); } // namespace kernel } // namespace mindspore diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/push_kernel.h b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/push_kernel.h index 938792f3bfd..d5876bd4611 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/push_kernel.h +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/push_kernel.h @@ -43,7 +43,7 @@ class PushKernel : public CPUKernel { sizes.push_back(SizeToInt(input->size) / sizeof(T)); } parallel::ps::Worker::GetInstance().Push(keys, addrs, sizes); - memcpy(outputs[0]->addr, &key_, sizeof(size_t)); + memcpy_s(outputs[0]->addr, sizeof(size_t), &key_, sizeof(size_t)); return true; } diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_adam_ps_kernel.cc b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_adam_ps_kernel.cc index c7283954f89..fa91f459472 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_adam_ps_kernel.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_adam_ps_kernel.cc @@ -75,7 +75,7 @@ void SparseApplyAdamPSKernel::ReInit(const std::shared_ptr &inputs) { const auto &indices_addr = inputs[10]; - indices_size_ = indices_addr->size; + indices_size_ = indices_addr->size / sizeof(int); workspace_size_list_[0] = indices_size_ * var_outer_dim_size_ * sizeof(float); workspace_size_list_[1] = indices_size_ * sizeof(int); } diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc index 0392bd5a696..93cd38c11b5 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc @@ -64,7 +64,7 @@ void SparseApplyFtrlPSKernel::ReInit(const std::shared_ptr &inputs) { const auto &indices_addr = inputs[4]; - indices_size_ = indices_addr->size; + indices_size_ = indices_addr->size / sizeof(int); workspace_size_list_[0] = indices_size_ * var_outer_dim_size_ * sizeof(float); workspace_size_list_[1] = indices_size_ * sizeof(int); } diff --git a/mindspore/ccsrc/backend/optimizer/pass/replace_node_by_proxy.cc b/mindspore/ccsrc/backend/optimizer/pass/replace_node_by_proxy.cc index cd34464cda8..53faa131b1f 100644 --- a/mindspore/ccsrc/backend/optimizer/pass/replace_node_by_proxy.cc +++ b/mindspore/ccsrc/backend/optimizer/pass/replace_node_by_proxy.cc @@ -71,7 +71,6 @@ bool ReplaceNodeByProxy::Run(const FuncGraphPtr &func_graph) { AbstractBasePtrList abstract_list; AnfAlgo::CopyNodeAttr(kAttrPsKey, cnode, proxy_node); - AnfAlgo::CopyNodeAttr("reduce_scatter_flag", cnode, proxy_node); AnfAlgo::CopyNodeAttr("offset", cnode, proxy_node); abstract_list.push_back(cnode->abstract()); auto abstract_tuple = std::make_shared(abstract_list); diff --git a/mindspore/ccsrc/backend/session/ascend_session.cc b/mindspore/ccsrc/backend/session/ascend_session.cc index 75bc4e2d058..766647d8a3a 100644 --- a/mindspore/ccsrc/backend/session/ascend_session.cc +++ b/mindspore/ccsrc/backend/session/ascend_session.cc @@ -353,6 +353,10 @@ GraphId AscendSession::CompileGraph(NotNull func_graph) { RootGraphExecutorValidate(NOT_NULL(root_graph)); // adjust kernel AdjustKernel(root_graph); +#if (!_WIN32 && !ENABLE_GE && !ENABLE_TESTCASES) + // Assign parameter keys. + AssignParamKey(root_graph); +#endif // assign stream AssignStream(NOT_NULL(root_graph)); // insert profiling point @@ -511,6 +515,12 @@ void AscendSession::RunGraph(const GraphId &graph_id, const std::vector &kernel_graph) { + auto optimizer = std::make_shared(); + auto pm = std::make_shared(); + std::string pass_name = "replace_node_by_proxy"; + pass_name.append(std::to_string(graph_sum_)); + pm->AddPass(std::make_shared(pass_name)); + optimizer->AddPassManager(pm); + (void)optimizer->Optimize(kernel_graph); + kernel_graph->SetExecOrderByDefault(); +} + GraphId CPUSession::CompileGraph(const AnfNodePtrList &lst, const AnfNodePtrList &outputs) { auto graph_id = graph_sum_; auto graph = ConstructKernelGraph(lst, outputs); MS_EXCEPTION_IF_NULL(graph); MS_LOG(INFO) << "Set kernel info"; SetKernelInfo(graph.get()); +#if (!_WIN32 && !ENABLE_GE && !ENABLE_TESTCASES) + AssignParamKey(graph); + if (parallel::ps::Util::IsRoleOfWorker()) { + Optimize(graph); + } +#endif predictmodel::StepConvertGraph(graph); MS_LOG(INFO) << "Build kernel"; BuildKernel(graph.get()); @@ -66,6 +89,12 @@ GraphId CPUSession::CompileGraph(const AnfNodePtrList &lst, const AnfNodePtrList void CPUSession::RunGraph(const GraphId &graph_id, const std::vector &inputs, VectorRef *outputs) { auto &kernel_graph = graphs_[graph_id]; MS_EXCEPTION_IF_NULL(kernel_graph); +#if (!_WIN32 && !ENABLE_GE && !ENABLE_TESTCASES) + // Initialize parameter server + if (!ps_init_) { + InitPSParamAndOptim(kernel_graph, inputs); + } +#endif MS_LOG(INFO) << "Bind input output address"; std::vector need_sync_outputs; runtime_.BindInputOutput(kernel_graph.get(), inputs, outputs, &need_sync_outputs); diff --git a/mindspore/ccsrc/backend/session/cpu_session.h b/mindspore/ccsrc/backend/session/cpu_session.h index b0dbd1cc2bb..8a94c21828a 100644 --- a/mindspore/ccsrc/backend/session/cpu_session.h +++ b/mindspore/ccsrc/backend/session/cpu_session.h @@ -37,6 +37,7 @@ class CPUSession : public SessionBasic { protected: ParameterPtr CreateNewParameterFromParameter(const AnfNodePtr &anf, bool valid_input, KernelGraph *graph) override; + void Optimize(const std::shared_ptr &kernel_graph); private: void SetKernelInfo(const KernelGraph *kernel_graph); diff --git a/mindspore/ccsrc/backend/session/gpu_session.cc b/mindspore/ccsrc/backend/session/gpu_session.cc index 14e30c1a443..c8644a5916d 100644 --- a/mindspore/ccsrc/backend/session/gpu_session.cc +++ b/mindspore/ccsrc/backend/session/gpu_session.cc @@ -167,6 +167,10 @@ GraphId GPUSession::CompileGraph(const AnfNodePtrList &lst, const AnfNodePtrList Optimize(graph); // Select kernel build info SelectKernel(graph); +#if (!_WIN32 && !ENABLE_GE && !ENABLE_TESTCASES) + // Assign parameter keys. + AssignParamKey(graph); +#endif // Convert kernel Graph to model predictmodel::StepConvertGraph(graph); // Start gpu kernel runtime @@ -204,6 +208,10 @@ void GPUSession::RunGraph(const GraphId &graph_id, const std::vector &node_list) { + MS_EXCEPTION_IF_NULL(push_node); + for (auto &node : node_list) { + if (node != nullptr && node->isa()) { + for (auto input : node->cast()->inputs()) { + if (push_node == AnfAlgo::VisitKernel(input, 0).first) { + if (AnfAlgo::GetCNodeName(node) != kPullOpName) { + MS_LOG(EXCEPTION) << "The edge between Push and Pull node is invalid."; + } + return node; + } + } + } + } + return nullptr; +} + +#if (!_WIN32 && !ENABLE_GE && !ENABLE_TESTCASES) +void SessionBasic::AssignParamKey(const KernelGraphPtr &kernel_graph) { + if (!parallel::ps::Util::IsRoleOfWorker()) { + MS_LOG(INFO) << "Not parameter server mode."; + return; + } + MS_EXCEPTION_IF_NULL(kernel_graph); + std::vector node_list = TopoSort(kernel_graph->get_return()); + for (auto &node : node_list) { + if (node != nullptr && node->isa()) { + // Assign key for forward kernel EmbeddingLookup. + // The key will be assigned to embedding table ande Push kernel as well. + if (AnfAlgo::GetCNodeName(node) == kEmbeddingLookupOpName) { + size_t embedding_table_idx = 0; + auto embedding_table = AnfAlgo::GetInputNode(node->cast(), embedding_table_idx); + size_t key = parallel::ps::Worker::GetInstance().SetParamKey(embedding_table->fullname_with_scope()); + AnfAlgo::SetNodeAttr(kAttrPsKey, MakeValue(key), node); + } else if (AnfAlgo::GetCNodeName(node) == kPushOpName) { + auto pull_node = FindPullNode(node, node_list); + if (!pull_node) { + MS_LOG(EXCEPTION) << "Assigning parameter key failed: can't find Pull node of the Push node."; + } + + // Second input of Pull node is the trainable parameter. + size_t parameter_index = 1; + auto parameter_node = AnfAlgo::GetInputNode(pull_node->cast(), parameter_index); + size_t key = parallel::ps::Worker::GetInstance().SetParamKey(parameter_node->fullname_with_scope()); + AnfAlgo::SetNodeAttr(kAttrPsKey, MakeValue(key), node); + AnfAlgo::SetNodeAttr(kAttrPsKey, MakeValue(key), pull_node); + + std::string optimizer_name = AnfAlgo::GetNodeAttr(node, kAttrOptimizerType); + parallel::ps::Worker::GetInstance().SetKeyOptimId(key, optimizer_name); + } + } + } +} + +void SessionBasic::InitPSParamAndOptim(const KernelGraphPtr &kernel_graph, + const std::vector &inputs_const) { + if (!parallel::ps::Util::IsRoleOfWorker()) { + return; + } + std::vector inputs(inputs_const); + size_t input_ctrl_size = 1; + MS_EXCEPTION_IF_NULL(kernel_graph); + if (kernel_graph->input_ctrl_tensors()) { + input_ctrl_size = LoadCtrlInputTensor(kernel_graph, &inputs); + } + auto input_nodes = kernel_graph->inputs(); + if ((inputs.size() + input_ctrl_size) - 1 != input_nodes.size()) { + MS_LOG(EXCEPTION) << "Tensor input:" << inputs.size() << " is not equal graph inputs:" << input_nodes.size() + << ", input_ctrl_size:" << input_ctrl_size; + } + auto ms_context = MsContext::GetInstance(); + MS_EXCEPTION_IF_NULL(ms_context); + for (size_t i = 0; i < inputs.size(); ++i) { + auto tensor = inputs[i]; + MS_EXCEPTION_IF_NULL(tensor); + auto input_node = input_nodes[i]; + MS_EXCEPTION_IF_NULL(input_node); + if (input_node->isa() && AnfAlgo::OutputAddrExist(input_node, 0)) { + auto pk_node = input_node->cast(); + mindspore::parallel::ps::Worker::GetInstance().InitPSParamAndOptim( + pk_node->fullname_with_scope(), tensor->data_c(), LongToSize(tensor->data().nbytes())); + } + } + ps_init_ = true; +} +#endif } // namespace session } // namespace mindspore diff --git a/mindspore/ccsrc/backend/session/session_basic.h b/mindspore/ccsrc/backend/session/session_basic.h index c662e3978bd..a8ef0a7e1e3 100755 --- a/mindspore/ccsrc/backend/session/session_basic.h +++ b/mindspore/ccsrc/backend/session/session_basic.h @@ -51,7 +51,7 @@ using OpRunInfoPtr = std::shared_ptr; class SessionBasic { public: - SessionBasic() : context_(nullptr), summary_callback_(nullptr), device_id_(0) { + SessionBasic() : context_(nullptr), summary_callback_(nullptr), device_id_(0), ps_init_(false) { #ifdef ENABLE_DEBUGGER debugger_ = nullptr; #endif @@ -104,6 +104,8 @@ class SessionBasic { virtual GraphId GetFinalRunGraph() const { return kInvalidGraphId; } virtual void SetActive(GraphId, GraphId) {} virtual void GetSummaryNodes(KernelGraph *graph); + void AssignParamKey(const KernelGraphPtr &kernel_graph); + void InitPSParamAndOptim(const KernelGraphPtr &kernel_graph, const std::vector &inputs_const); #ifdef ENABLE_DEBUGGER // set debugger @@ -140,6 +142,7 @@ class SessionBasic { AnfNodePtr CreateNewParameterFromCNode(const AnfNodePtr &anf, bool valid_input, KernelGraph *graph); void AddParameterToGraphInputs(const std::vector ¶meters, KernelGraph *graph); void InitInternalOutputParameter(const AnfNodePtr &out_node, const AnfNodePtr ¶meter); + AnfNodePtr FindPullNode(const AnfNodePtr &push_node, const std::vector &node_list); std::unordered_map> graphs_; std::unordered_map> run_op_graphs_; @@ -148,6 +151,7 @@ class SessionBasic { CallBackFunc summary_callback_; static GraphId graph_sum_; uint32_t device_id_; + bool ps_init_; #ifdef ENABLE_DEBUGGER std::shared_ptr debugger_; #endif diff --git a/mindspore/ccsrc/frontend/parallel/CMakeLists.txt b/mindspore/ccsrc/frontend/parallel/CMakeLists.txt index d2a099cf415..0f667791467 100644 --- a/mindspore/ccsrc/frontend/parallel/CMakeLists.txt +++ b/mindspore/ccsrc/frontend/parallel/CMakeLists.txt @@ -1,5 +1,12 @@ file(GLOB_RECURSE _PARALLEL_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc") -list(REMOVE_ITEM _PARALLEL_SRC_FILES "ps/util.cc" "ps/scheduler.cc" "ps/optimizer_info.cc" "ps/optimizer_info_builder.cc") + +if (${CMAKE_SYSTEM_NAME} MATCHES "Windows" OR ENABLE_GE) + list(REMOVE_ITEM _PARALLEL_SRC_FILES "ps/optimizer_info_builder.cc") + list(REMOVE_ITEM _PARALLEL_SRC_FILES "ps/optimizer_info.cc") + list(REMOVE_ITEM _PARALLEL_SRC_FILES "ps/scheduler.cc") + list(REMOVE_ITEM _PARALLEL_SRC_FILES "ps/util.cc") +endif() + if (ENABLE_DUMP_PROTO) list(REMOVE_ITEM _PARALLEL_SRC_FILES "parallel/strategy_checkpoint/parallel_strategy_checkpoint.cc") endif () diff --git a/mindspore/ccsrc/frontend/parallel/ps/optimizer_info.cc b/mindspore/ccsrc/frontend/parallel/ps/optimizer_info.cc index e16c713e3c7..cbfa5829837 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/optimizer_info.cc +++ b/mindspore/ccsrc/frontend/parallel/ps/optimizer_info.cc @@ -118,11 +118,13 @@ const AddressPtr &MomentumOptimInfo::gradient() { return inputs_[3]; } const AddressPtr &MomentumOptimInfo::indices() { return inputs_[3]; } +size_t MomentumOptimInfo::grad_index() { return 1; } + SparseAdamOptimInfo::SparseAdamOptimInfo(const AddressPtr &weight, const AddressPtr &m, const AddressPtr &v, const AddressPtr &beta1_power, const AddressPtr &beta2_power, const AddressPtr &learning_rate, const AddressPtr &beta1, const AddressPtr &beta2, const AddressPtr &epsilon, const AddressPtr &grad, - const AddressPtr &indices, size_t grads_offset, size_t indices_offset) { + const AddressPtr &indices) { inputs_.push_back(weight); inputs_.push_back(m); inputs_.push_back(v); @@ -134,8 +136,8 @@ SparseAdamOptimInfo::SparseAdamOptimInfo(const AddressPtr &weight, const Address inputs_.push_back(epsilon); inputs_.push_back(grad); inputs_.push_back(indices); - grads_offset_ = grads_offset; - indices_offset_ = indices_offset; + grads_offset_ = 0; + indices_offset_ = 0; } void SparseAdamOptimInfo::Update(const Values &values, const Lengths &lens) { @@ -159,15 +161,14 @@ size_t SparseAdamOptimInfo::grad_index() { return 6; } size_t SparseAdamOptimInfo::indices_index() { return 7; } SparseFtrlOptimInfo::SparseFtrlOptimInfo(const AddressPtr &weight, const AddressPtr &accum, const AddressPtr &linear, - const AddressPtr &grad, const AddressPtr &indices, size_t grads_offset, - size_t indices_offset) { + const AddressPtr &grad, const AddressPtr &indices) { inputs_.push_back(weight); inputs_.push_back(accum); inputs_.push_back(linear); inputs_.push_back(grad); inputs_.push_back(indices); - grads_offset_ = grads_offset; - indices_offset_ = indices_offset; + grads_offset_ = 0; + indices_offset_ = 0; } const AddressPtr &SparseFtrlOptimInfo::gradient() { return inputs_[3]; } diff --git a/mindspore/ccsrc/frontend/parallel/ps/optimizer_info.h b/mindspore/ccsrc/frontend/parallel/ps/optimizer_info.h index bb9a64acdb3..ada020a95a9 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/optimizer_info.h +++ b/mindspore/ccsrc/frontend/parallel/ps/optimizer_info.h @@ -81,6 +81,7 @@ class MomentumOptimInfo : public DenseOptimInfo { const AddressPtr &gradient(); const AddressPtr &indices(); + size_t grad_index() override; }; class SparseAdamOptimInfo : public SparseOptimInfo { @@ -88,7 +89,7 @@ class SparseAdamOptimInfo : public SparseOptimInfo { SparseAdamOptimInfo(const AddressPtr &weight, const AddressPtr &m, const AddressPtr &v, const AddressPtr &beta1_power, const AddressPtr &beta2_power, const AddressPtr &learning_rate, const AddressPtr &beta1, const AddressPtr &beta2, const AddressPtr &epsilon, const AddressPtr &grad, - const AddressPtr &indices, size_t grads_offset, size_t indices_offset); + const AddressPtr &indices); ~SparseAdamOptimInfo() override = default; void Update(const Values &values, const Lengths &lens) override; @@ -102,7 +103,7 @@ class SparseAdamOptimInfo : public SparseOptimInfo { class SparseFtrlOptimInfo : public SparseOptimInfo { public: SparseFtrlOptimInfo(const AddressPtr &weight, const AddressPtr &accum, const AddressPtr &linear, - const AddressPtr &grad, const AddressPtr &indices, size_t grads_offset, size_t indices_offset); + const AddressPtr &grad, const AddressPtr &indices); ~SparseFtrlOptimInfo() override = default; const AddressPtr &gradient(); diff --git a/mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.cc b/mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.cc index 159a50793e1..7b6686ea869 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.cc +++ b/mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.cc @@ -48,20 +48,25 @@ OptimizerInfo *MomentumOptimInfoBuilder::BuildInputs(const WeightPtr &weight, co size_t worker_num) { AddressPtr weight_addr = std::make_shared(); weight_addr->addr = weight->data(); - weight_addr->size = weight->size(); + weight_addr->size = weight->size() * sizeof(float); void *data_ptr = values.data(); + void *copy_data_ptr = new float[values.size()]; + auto ret = memcpy_s(copy_data_ptr, values.size() * sizeof(float), data_ptr, values.size() * sizeof(float)); + if (ret != 0) { + MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; + } AddressPtr accumulate = std::make_shared(); accumulate->addr = new float[weight->size()]; - accumulate->size = weight->size(); + accumulate->size = weight->size() * sizeof(float); AddressPtr learning_rate = std::make_shared(); - learning_rate->addr = data_ptr; - learning_rate->size = lens[0]; + learning_rate->addr = copy_data_ptr; + learning_rate->size = lens[0] * sizeof(float); AddressPtr gradient = std::make_shared(); gradient->addr = reinterpret_cast(learning_rate->addr) + lens[0]; - gradient->size = lens[1]; + gradient->size = lens[1] * sizeof(float); AddressPtr momentum = std::make_shared(); momentum->addr = reinterpret_cast(gradient->addr) + lens[1]; - momentum->size = lens[2]; + momentum->size = lens[2] * sizeof(float); return new MomentumOptimInfo(weight_addr, accumulate, learning_rate, gradient, momentum); } @@ -131,10 +136,10 @@ OptimizerInfo *SparseAdamOptimInfoBuilder::BuildInputs(const WeightPtr &weight, if (ret3 != 0) { MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret3 << ")"; } - indices->size = lens[7] * sizeof(float); + indices->size = lens[7] * sizeof(int); return new SparseAdamOptimInfo(weight_addr, m, v, beta1_power, beta2_power, learning_rate, beta1, beta2, epsilon, - grad, indices, total_grad_size, total_indice_size); + grad, indices); } OptimizerInfo *SparseFtrlOptimInfoBuilder::BuildInputs(const WeightPtr &weight, const Keys &keys, const Values &values, @@ -175,9 +180,9 @@ OptimizerInfo *SparseFtrlOptimInfoBuilder::BuildInputs(const WeightPtr &weight, if (ret2 != 0) { MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret2 << ")"; } - indices->size = lens[1] * sizeof(float); + indices->size = lens[1] * sizeof(int); - return new SparseFtrlOptimInfo(weight_addr, accum, linear, grad, indices, total_grad_size, total_indice_size); + return new SparseFtrlOptimInfo(weight_addr, accum, linear, grad, indices); } } // namespace ps } // namespace parallel diff --git a/mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.h b/mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.h index c5aae32921b..5a12799775e 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.h +++ b/mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.h @@ -19,7 +19,7 @@ #include #include #include "backend/kernel_compiler/kernel.h" -#include "backend/kernel_compiler/ps/pserver_kernel.h" +#include "backend/kernel_compiler/cpu/ps/pserver_kernel.h" #include "frontend/parallel/ps/optimizer_info.h" namespace mindspore { diff --git a/mindspore/ccsrc/frontend/parallel/ps/parameter_server.h b/mindspore/ccsrc/frontend/parallel/ps/parameter_server.h index 1afb4c9fa65..56c9e34879e 100755 --- a/mindspore/ccsrc/frontend/parallel/ps/parameter_server.h +++ b/mindspore/ccsrc/frontend/parallel/ps/parameter_server.h @@ -40,12 +40,12 @@ #include "runtime/device/cpu/kernel_select_cpu.h" #include "utils/context/ms_context.h" #include "backend/kernel_compiler/kernel.h" -#include "backend/kernel_compiler/ps/pserver_kernel.h" #include "backend/kernel_compiler/cpu/cpu_kernel_factory.h" -#include "backend/kernel_compiler/ps/sparse_apply_adam_ps_kernel.h" -#include "backend/kernel_compiler/ps/sparse_apply_ftrl_ps_kernel.h" -#include "backend/kernel_compiler/ps/apply_momentum_ps_kernel.h" -#include "backend/kernel_compiler/ps/embedding_look_up_ps_kernel.h" +#include "backend/kernel_compiler/cpu/ps/pserver_kernel.h" +#include "backend/kernel_compiler/cpu/ps/sparse_apply_adam_ps_kernel.h" +#include "backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.h" +#include "backend/kernel_compiler/cpu/ps/apply_momentum_ps_kernel.h" +#include "backend/kernel_compiler/cpu/ps/embedding_look_up_ps_kernel.h" namespace mindspore { namespace parallel { @@ -118,7 +118,7 @@ class ParameterServer { std::shared_ptr kernel_graph_; std::shared_ptr sess_; - std::unordered_map> optimizers_; + std::unordered_map> optimizers_; std::unordered_map optim_inputs_shape_; std::unordered_map> optim_infos_; std::unordered_map> optim_info_builders_; @@ -249,10 +249,10 @@ template void ParameterServer::ServerHandler::HandleEmbeddingLookup(const ::ps::KVMeta &req_meta, const ::ps::KVPairs &req_data, ::ps::KVPairs *res) { const Key &key = req_data.keys[0]; - ps_->DoEmbeddingLookup(key, req_data.vals, res); for (size_t i = 0; i < req_data.vals.size(); i++) { - res->keys->push_back(req_data.vals[i]); + res->keys.push_back(req_data.vals[i]); } + ps_->DoEmbeddingLookup(key, req_data.vals, res); } template @@ -288,7 +288,7 @@ void ParameterServer::InitOptimInfoBuilders() { template void ParameterServer::InitWeightKeyToOptims(const Key &key, const int &optim_id) { - if (weight_key_to_optims_.count(key) > 0 || Util::optimizer_name(key) == "") { + if (weight_key_to_optims_.count(key) > 0 || Util::optimizer_name(optim_id) == "") { return; } weight_key_to_optims_[key] = Util::optimizer_name(optim_id); @@ -314,22 +314,22 @@ void ParameterServer::InitOptimInputsShape(const Keys &keys, const Values &va } if (weight_key_to_optims_.count(key) > 0) { const std::string &optim_name = weight_key_to_optims_[key]; - if (optimizers_.count(optim_name) == 0 && optim_inputs_shape_.count(key) > 0) { + if (optimizers_.count(key) == 0 && optim_inputs_shape_.count(key) > 0) { if (optim_name == kSparseAdam) { std::shared_ptr optimizer = std::make_shared(rank_id_, pserver_num_); optimizer->InitKernel(optim_inputs_shape_[key]); - optimizers_[optim_name] = optimizer; + optimizers_[key] = optimizer; } else if (optim_name == kApplyMomentum) { std::shared_ptr optimizer = std::make_shared(rank_id_, pserver_num_); optimizer->InitKernel(optim_inputs_shape_[key]); - optimizers_[optim_name] = optimizer; + optimizers_[key] = optimizer; } else if (optim_name == kSparseFtrl) { std::shared_ptr optimizer = std::make_shared(rank_id_, pserver_num_); optimizer->InitKernel(optim_inputs_shape_[key]); - optimizers_[optim_name] = optimizer; + optimizers_[key] = optimizer; } } } @@ -382,8 +382,7 @@ void ParameterServer::UpdateWeights() { std::shared_ptr optimizer = nullptr; if (weight_key_to_optims_.count(key) > 0) { - const std::string &optim_name = weight_key_to_optims_[key]; - optimizer = optimizers_[optim_name]; + optimizer = optimizers_[key]; } MS_EXCEPTION_IF_NULL(optimizer); @@ -391,8 +390,6 @@ void ParameterServer::UpdateWeights() { if (optim_info == nullptr) { continue; } - const WeightPtr &weight = weights_[key]; - optim_info->UpdateWeight(weight); const std::vector &inputs = optim_info->inputs(); const std::vector &workspaces = optim_info->workspaces(); const std::vector &outputs = optim_info->outputs(); @@ -416,7 +413,7 @@ void ParameterServer::AccumGrad(const Keys &keys, const Values &values, const // Create or update the optimizer info if (optim_info == nullptr) { const std::shared_ptr &builder = optim_info_builders_[weight_key_to_optims_[key]]; - std::shared_ptr pserver_kernel = optimizers_[weight_key_to_optims_[key]]; + std::shared_ptr pserver_kernel = optimizers_[key]; if (pserver_kernel == nullptr) { MS_LOG(EXCEPTION) << "no optimizer found for key " << key << " optim name " << weight_key_to_optims_[key]; } @@ -427,10 +424,8 @@ void ParameterServer::AccumGrad(const Keys &keys, const Values &values, const optim_infos_[key] = optim_info; } else { optim_info->Update(values, lengths); + optim_info->Accumulate(values, lengths); } - MS_EXCEPTION_IF_NULL(optim_info); - - optim_info->Accumulate(values, lengths); grads_accum_counter_[key] += 1; if (grads_accum_counter_[key] == worker_num_) { @@ -499,7 +494,7 @@ void ParameterServer::DoEmbeddingLookup(Key key, const LookupIds &lookup_ids, table_lookup_op->Execute(inputs, workspaces, outputs); res->vals = *addr; - res->lens.push_back(res.vals.size()); + res->lens.push_back(res->vals.size()); } template diff --git a/mindspore/ccsrc/frontend/parallel/ps/worker.h b/mindspore/ccsrc/frontend/parallel/ps/worker.h index 9ecbc28fc51..13cfef4d9f8 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/worker.h +++ b/mindspore/ccsrc/frontend/parallel/ps/worker.h @@ -48,7 +48,7 @@ class Worker { void AddEmbeddingTable(const ::ps::Key &key, const size_t &row_count); void InitPSEmbeddingTable(const std::vector &keys, std::vector shapes, const std::vector &sizes); void InitPSParamAndOptim(const std::string ¶m_name, void *param_data, size_t param_size); - void DoPSEmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &lookup_ids, + void DoPSEmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &lookup_ids, const ::ps::SArray &lens, ::ps::SArray *lookup_result, int cmd); private: @@ -98,7 +98,8 @@ void Worker::Push(const std::vector &keys, std::vector add ::ps::SArray total_buffer(total_size, 0); size_t offset = 0; for (size_t i = 0; i < sizes.size(); i++) { - memcpy(total_buffer.data() + offset / sizeof(T), addrs[i], sizes[i] * sizeof(T)); + memcpy_s(total_buffer.data() + offset / sizeof(T), sizes[i] * sizeof(T), reinterpret_cast(addrs[i]), + sizes[i] * sizeof(T)); offset += sizes[i] * sizeof(T); } kv_worker_->PushData(::ps::SArray<::ps::Key>(keys), total_buffer, ::ps::SArray(sizes)); @@ -108,13 +109,13 @@ template void Worker::Pull(const size_t key, void *dev_addr, const size_t size) { ::ps::SArray variables(size / sizeof(T), 0); kv_worker_->Wait(kv_worker_->ZPull({key}, &variables)); - memcpy(dev_addr, variables.data(), size); + memcpy_s(dev_addr, size, variables.data(), size); } template -void Worker::DoPSEmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &lookup_ids, +void Worker::DoPSEmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &lookup_ids, const ::ps::SArray &lens, ::ps::SArray *lookup_result, int cmd) { - kv_worker_->EmbeddingLookup(keys, lookup_ids, lens, &lookup_result, cmd); + kv_worker_->EmbeddingLookup(keys, lookup_ids, lens, lookup_result, cmd); } template diff --git a/mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h b/mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h index a0f58d39a4a..6d68419383c 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h +++ b/mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "ps/ps.h" #include "frontend/parallel/ps/util.h" @@ -34,24 +35,23 @@ class WorkerProxy : public ::ps::KVWorker { using Worker = ::ps::KVWorker; using Callback = std::function; using SlicedKVs = std::vector>>; - using Slicer = - std::function &send, const std::vector<::ps::Range> &ranges, SlicedKVs *sliced)>; + using Slicer = std::function &send, const std::vector<::ps::Range> &ranges, + SlicedKVs *sliced)>; using ::ps::SimpleApp::obj_; explicit WorkerProxy(int app_id, int customer_id, int lookup_customer_id) : Worker(app_id, customer_id) { - using _1 = std::placeholders::_1; - using _2 = std::placeholders::_2; - using _3 = std::placeholders::_3; + using std::placeholders::_1; + using std::placeholders::_2; + using std::placeholders::_3; + using std::placeholders::_4; lookup_customer_ = std::unique_ptr<::ps::Customer>( new ::ps::Customer(app_id, lookup_customer_id, std::bind(&WorkerProxy::ProcessLookupResult, this, _1))); - lookup_slicer_ = std::bind(&WorkerProxy::LookupIdSlicer, this, _1, _2, _3); - init_embedding_slicer_ = std::bind(&WorkerProxy::EmbeddingTableInitSlicer, this, _1, _2, _3); - push_slicer_ = std::bind(&WorkerProxy::PushSlicer, this, _1, _2, _3); - broadcast_slicer_ = std::bind(&WorkerProxy::BroadcastSlicer, this, _1, _2, _3); + lookup_slicer_ = std::bind(&WorkerProxy::LookupIdSlicer, this, _1, _2, _3, _4); + broadcast_slicer_ = std::bind(&WorkerProxy::BroadcastSlicer, this, _1, _2, _3, _4); } ~WorkerProxy() override = default; void AddEmbeddingTable(const ::ps::Key &key, const size_t &row_count); - void EmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &lookup_ids, + void EmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &lookup_ids, const ::ps::SArray &lens, ::ps::SArray *outs, int cmd = 0, const Callback &cb = nullptr, int priority = 0); int InitEmbeddingTable(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &vals, @@ -61,15 +61,11 @@ class WorkerProxy : public ::ps::KVWorker { private: template - int AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &lookup_ids, C *vals, int cmd, + int AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &lookup_ids, C *vals, int cmd, const Callback &cb); - void LookupIdSlicer(const ::ps::KVPairs &send, const std::vector<::ps::Range> &, + void LookupIdSlicer(int timestamp, const ::ps::KVPairs &send, const std::vector<::ps::Range> &, std::vector>> *sliced); - void EmbeddingTableInitSlicer(const ::ps::KVPairs &send, const std::vector<::ps::Range> &, - std::vector>> *sliced); - void PushSlicer(const ::ps::KVPairs &send, const std::vector<::ps::Range> &, - std::vector>> *sliced); - void BroadcastSlicer(const ::ps::KVPairs &send, const std::vector<::ps::Range> &, + void BroadcastSlicer(int timestamp, const ::ps::KVPairs &send, const std::vector<::ps::Range> &, std::vector>> *sliced); void ProcessLookupResult(const ::ps::Message &msg); void Send(::ps::Customer *customer, int timestamp, bool push, bool pull, int cmd, const ::ps::KVPairs &kvs, @@ -80,10 +76,9 @@ class WorkerProxy : public ::ps::KVWorker { std::unordered_map>> lookup_results_; std::mutex mutex_; Slicer lookup_slicer_; - Slicer init_embedding_slicer_; - Slicer push_slicer_; Slicer broadcast_slicer_; std::unordered_map lookup_callbacks_; + std::unordered_map expected_result_count_; }; template @@ -108,17 +103,21 @@ void WorkerProxy::AddEmbeddingTable(const ::ps::Key &key, const size_t &row_c } template -void WorkerProxy::EmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &lookup_ids, +void WorkerProxy::EmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &lookup_ids, const ::ps::SArray &lens, ::ps::SArray *outs, int cmd, const Callback &cb, int priority) { int ts = AddLookupCB(keys, lookup_ids, outs, cmd, cb); ::ps::KVPairs kvs; kvs.keys = keys; - kvs.vals = lookup_ids; - kvs.lens = lens; + kvs.lens = lookup_ids; kvs.priority = priority; - Send(lookup_customer_.get(), ts, true, true, cmd, kvs, broadcast_slicer_); + expected_result_count_[ts] = 0; + Send(lookup_customer_.get(), ts, true, true, cmd, kvs, lookup_slicer_); + int server_num = ::ps::NumServers(); + int expect_rt_count = expected_result_count_[ts]; + lookup_customer_->AddResponse(ts, server_num - expect_rt_count); lookup_customer_->WaitRequest(ts); + expected_result_count_.erase(ts); } template @@ -130,7 +129,7 @@ int WorkerProxy::InitEmbeddingTable(const ::ps::SArray<::ps::Key> &keys, cons kvs.vals = vals; kvs.lens = lens; kvs.priority = priority; - Send(obj_, ts, true, false, kInitEmbeddingsCmd, kvs, init_embedding_slicer_); + Send(obj_, ts, true, false, kInitEmbeddingsCmd, kvs, broadcast_slicer_); return ts; } @@ -143,13 +142,13 @@ void WorkerProxy::PushData(const ::ps::SArray<::ps::Key> &keys, const ::ps::S kvs.vals = vals; kvs.lens = lens; kvs.priority = priority; - Send(obj_, ts, true, false, cmd, kvs, push_slicer_); + Send(obj_, ts, true, false, cmd, kvs, broadcast_slicer_); obj_->WaitRequest(ts); } template template -int WorkerProxy::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &lookup_ids, +int WorkerProxy::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &lookup_ids, C *lookup_result, int cmd, const Callback &cb) { int ts = lookup_customer_->NewRequest(::ps::kServerGroup); const auto &callback = [this, ts, keys, lookup_ids, lookup_result, cb]() mutable { @@ -158,20 +157,30 @@ int WorkerProxy::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const ::ps: mutex_.unlock(); size_t total_len = 0; - const auto &s = kvs[0]; - for (size_t i = 0; i < s.lens.size(); i++) { - total_len += s.lens[i]; - } - lookup_result->resize(total_len, 0); - T *result_addr = lookup_result->data(); - + std::unordered_map>> id_addr_map; for (const auto &s : kvs) { - size_t offset = 0; - for (size_t i = 0; i < s.vals.size(); i++) { - result_addr[offset++] += s.vals[i]; + int offset = 0; + int len = s.vals.size() / s.keys.size(); + for (size_t i = 0; i < s.keys.size(); i++) { + const Key &key = s.keys[i]; + T *addr = s.vals.data() + offset; + offset += len; + total_len += len; + id_addr_map[key] = std::make_shared>(std::make_pair(addr, len)); } } + T *result_addr = lookup_result->data(); + int offset = 0; + for (size_t i = 0; i < lookup_ids.size(); i++) { + auto &pair = id_addr_map[static_cast(lookup_ids[i])]; + auto ret = memcpy_s(result_addr + offset, pair->second, pair->first, pair->second); + if (ret != 0) { + MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; + } + offset += pair->second; + } + mutex_.lock(); lookup_results_.erase(ts); mutex_.unlock(); @@ -182,31 +191,30 @@ int WorkerProxy::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const ::ps: } template -void WorkerProxy::LookupIdSlicer(const ::ps::KVPairs &send, const std::vector<::ps::Range> &, +void WorkerProxy::LookupIdSlicer(int timestamp, const ::ps::KVPairs &send, const std::vector<::ps::Range> &, std::vector>> *sliced) { - int *data = send.lens.data(); - size_t size = send.lens.size(); - std::vector lookup_ids(data, data + size); - std::sort(lookup_ids.begin(), lookup_ids.end()); + int *lookup_ids = send.lens.data(); + size_t id_size = send.lens.size(); const Key &key = send.keys[0]; const std::vector<::ps::Range> &ranges = *(embedding_table_ranges_[key]); sliced->resize(ranges.size()); - size_t index = 0; for (size_t i = 0; i < ranges.size(); i++) { const ::ps::Range &range = ranges[i]; const auto &begin = range.begin(); const auto &end = range.end(); + std::unordered_set unique_ids; auto &kvs = sliced->at(i).second; - auto lookup_id = static_cast(lookup_ids[index]); - while (lookup_id >= begin && lookup_id <= end) { - kvs.vals.push_back(lookup_id); - if (++index >= lookup_ids.size()) { - break; + for (size_t j = 0; j < id_size; j++) { + auto lookup_id = static_cast(lookup_ids[j]); + if (lookup_id >= begin && lookup_id <= end) { + unique_ids.insert(lookup_id); } - lookup_id = static_cast(lookup_ids[index]); + } + for (const auto &lookup_id : unique_ids) { + kvs.vals.push_back(lookup_id); } kvs.keys.push_back(key); kvs.lens.push_back(kvs.vals.size()); @@ -215,35 +223,13 @@ void WorkerProxy::LookupIdSlicer(const ::ps::KVPairs &send, const std::vec sliced->at(i).first = false; } else { sliced->at(i).first = true; + expected_result_count_[timestamp] += 1; } } } template -void WorkerProxy::EmbeddingTableInitSlicer(const ::ps::KVPairs &send, const std::vector<::ps::Range> &, - std::vector>> *sliced) { - const Key &key = send.keys[0]; - const std::vector<::ps::Range> &ranges = *(embedding_table_ranges_[key]); - sliced->resize(ranges.size()); - for (size_t i = 0; i < ranges.size(); i++) { - sliced->at(i).first = true; - sliced->at(i).second = send; - } -} - -template -void WorkerProxy::PushSlicer(const ::ps::KVPairs &send, const std::vector<::ps::Range> &, - std::vector>> *sliced) { - auto server_num = ::ps::Postoffice::Get()->num_servers(); - sliced->resize(server_num); - for (int i = 0; i < server_num; i++) { - sliced->at(i).first = true; - sliced->at(i).second = send; - } -} - -template -void WorkerProxy::BroadcastSlicer(const ::ps::KVPairs &send, const std::vector<::ps::Range> &, +void WorkerProxy::BroadcastSlicer(int timestamp, const ::ps::KVPairs &send, const std::vector<::ps::Range> &, std::vector>> *sliced) { auto server_num = ::ps::Postoffice::Get()->num_servers(); sliced->resize(server_num); @@ -268,7 +254,7 @@ void WorkerProxy::ProcessLookupResult(const ::ps::Message &msg) { lookup_results_[ts].push_back(kvs); mutex_.unlock(); } - if (lookup_customer_->NumResponse(ts) == ::ps::Postoffice::Get()->num_servers() - 1) { + if (lookup_customer_->NumResponse(ts) == expected_result_count_[ts] - 1) { const auto &cb = lookup_callbacks_[ts]; cb(); lookup_callbacks_.erase(ts); @@ -279,7 +265,7 @@ template void WorkerProxy::Send(::ps::Customer *customer, int timestamp, bool push, bool pull, int cmd, const ::ps::KVPairs &kvs, const Slicer &slicer) { SlicedKVs sliced; - slicer(kvs, ::ps::Postoffice::Get()->GetServerKeyRanges(), &sliced); + slicer(timestamp, kvs, ::ps::Postoffice::Get()->GetServerKeyRanges(), &sliced); for (size_t i = 0; i < sliced.size(); i++) { const auto &s = sliced[i]; diff --git a/mindspore/ccsrc/minddata/dataset/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/CMakeLists.txt index df9729c4ee1..168a4eb7b35 100644 --- a/mindspore/ccsrc/minddata/dataset/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/CMakeLists.txt @@ -146,6 +146,12 @@ if (${CMAKE_SYSTEM_NAME} MATCHES "Windows") target_link_libraries(_c_dataengine PRIVATE _c_mindrecord ${MINDRECORD_LINK_OBJECT} mindspore::sqlite) else() target_link_libraries(_c_dataengine PRIVATE _c_mindrecord) + if (NOT ENABLE_GE) + target_link_libraries(_c_dataengine PRIVATE mindspore::pslite mindspore::protobuf ${zeromq_DIRPATH}/zmq_install/lib/libzmq.a) + if (${ENABLE_IBVERBS} STREQUAL "ON") + target_link_libraries(_c_dataengine PRIVATE ibverbs rdmacm) + endif() + endif() endif() if (USE_GLOG) diff --git a/mindspore/ccsrc/pipeline/jit/action.cc b/mindspore/ccsrc/pipeline/jit/action.cc index 74eb9f3f9b5..409bd28a6da 100644 --- a/mindspore/ccsrc/pipeline/jit/action.cc +++ b/mindspore/ccsrc/pipeline/jit/action.cc @@ -40,6 +40,11 @@ #include "vm/transform.h" #include "parse/python_adapter.h" #include "frontend/optimizer/py_pass_manager.h" +#if (!_WIN32 && !ENABLE_GE && !ENABLE_TESTCASES) +#include "frontend/parallel/ps/parameter_server.h" +#include "frontend/parallel/ps/scheduler.h" +#include "frontend/parallel/ps/worker.h" +#endif namespace mindspore { namespace pipeline { @@ -374,6 +379,25 @@ bool ExecuteAction(const ResourcePtr &res) { return true; } +#if (!_WIN32 && !ENABLE_GE && !ENABLE_TESTCASES) +bool StartPSWorkerAction(const ResourcePtr &res) { + parallel::ps::Worker::GetInstance().Run(); + return true; +} + +bool StartPSServerAction(const ResourcePtr &res) { + FuncGraphPtr func_graph = res->func_graph(); + auto &ps = parallel::ps::ParameterServer::GetInstance(); + ps.Run(func_graph); + return true; +} + +bool StartPSSchedulerAction(const ResourcePtr &res) { + parallel::ps::Scheduler::GetInstance().Run(); + return true; +} +#endif + // The parallel primitive related valuenode might be partitioned so that its value changes by device, // that will result in a syncronization error due to different executing order. // Here we temporarily avoid the problem by skipping valuenode merging used by parallel related primitive, @@ -481,7 +505,11 @@ std::vector VmPipeline() { actions.emplace_back(std::make_pair("py_opt", OptActionPyStub)); actions.emplace_back(std::make_pair("validate", ValidateAction)); - +#if (!_WIN32 && !ENABLE_GE && !ENABLE_TESTCASES) + if (parallel::ps::Util::IsRoleOfWorker()) { + actions.emplace_back(std::make_pair("worker", StartPSWorkerAction)); + } +#endif // compile the ANF graph actions.emplace_back(std::make_pair("task_emit", TaskEmitAction)); @@ -490,5 +518,21 @@ std::vector VmPipeline() { return actions; } + +#if (!_WIN32 && !ENABLE_GE && !ENABLE_TESTCASES) +std::vector PServerPipeline() { + auto actions = CommonPipeline(); + actions.emplace_back(std::make_pair("optimize", VmOptimizeAction)); + actions.emplace_back(std::make_pair("validate", ValidateAction)); + actions.emplace_back(std::make_pair("pserver", StartPSServerAction)); + return actions; +} + +std::vector PSchedulerPipeline() { + std::vector actions; + actions.emplace_back(std::make_pair("scheduler", StartPSSchedulerAction)); + return actions; +} +#endif } // namespace pipeline } // namespace mindspore diff --git a/mindspore/ccsrc/pipeline/jit/action.h b/mindspore/ccsrc/pipeline/jit/action.h index 0a1feab1c9f..03ea2450d95 100644 --- a/mindspore/ccsrc/pipeline/jit/action.h +++ b/mindspore/ccsrc/pipeline/jit/action.h @@ -38,9 +38,14 @@ bool VmOptimizeAction(const ResourcePtr &res); bool PynativeOptimizeAction(const ResourcePtr &res); bool TaskEmitAction(const ResourcePtr &res); bool ExecuteAction(const ResourcePtr &res); +bool StartPSWorkerAction(const ResourcePtr &res); +bool StartPSServerAction(const ResourcePtr &res); +bool StartPSSchedulerAction(const ResourcePtr &res); std::vector GePipeline(); std::vector VmPipeline(); +std::vector PServerPipeline(); +std::vector PSchedulerPipeline(); abstract::AnalysisResult AbstractAnalyze(const ResourcePtr &res, const FuncGraphPtr &func_graph, const abstract::AbstractBasePtrList &args_spec, bool clear = false); FuncGraphPtr ProgramSpecialize(const ResourcePtr &res, const FuncGraphPtr &func_graph, diff --git a/mindspore/ccsrc/pipeline/jit/pipeline.cc b/mindspore/ccsrc/pipeline/jit/pipeline.cc index 05699793ff8..49bebfb3c42 100644 --- a/mindspore/ccsrc/pipeline/jit/pipeline.cc +++ b/mindspore/ccsrc/pipeline/jit/pipeline.cc @@ -41,6 +41,11 @@ #include "pipeline/pynative/pynative_execute.h" #include "frontend/optimizer/py_pass_manager.h" +#if (!_WIN32 && !ENABLE_GE && !ENABLE_TESTCASES) +#include "frontend/parallel/ps/common.h" +#include "frontend/parallel/ps/util.h" +#endif + #if (ENABLE_GE || ENABLE_D) #include "pipeline/jit/pipeline_ge.h" #include "transform/graph_ir/convert.h" @@ -420,6 +425,26 @@ bool ExecutorPy::CompileInner(const py::object &obj, const py::tuple &args, cons use_vm = ChangeExportGeirUseVmFlag(use_vm, phase_s); std::string backend = MsContext::GetInstance()->backend_policy(); +#if (!_WIN32 && !ENABLE_GE && !ENABLE_TESTCASES) + if (mindspore::parallel::ps::Util::IsParamServerMode()) { + mindspore::parallel::ps::Util::SetInternalEnvVar(); + } + if (parallel::ps::Util::IsRoleOfPServer()) { + resource->results()[kBackend] = compile::CreateBackend(); + p_actions = PServerPipeline(); + } else if (parallel::ps::Util::IsRoleOfScheduler()) { + p_actions = PSchedulerPipeline(); + } else if (use_vm && backend != "ge") { + // Create backend and session + auto backend_ptr = compile::CreateBackend(); + // Connect session to debugger + backend_ptr->SetDebugger(); + resource->results()[kBackend] = backend_ptr; + p_actions = VmPipeline(); + } else { + p_actions = GePipeline(); + } +#else if (use_vm && backend != "ge") { // Create backend and session auto backend_ptr = compile::CreateBackend(); @@ -430,6 +455,7 @@ bool ExecutorPy::CompileInner(const py::object &obj, const py::tuple &args, cons } else { p_actions = GePipeline(); } +#endif std::shared_ptr pip = std::make_shared(resource, FilterActions(p_actions, phase_s));