From 60bdbf58646a1e2b4aeb1b03980dc6fb11e7d4da Mon Sep 17 00:00:00 2001 From: ZPaC Date: Wed, 17 Aug 2022 11:15:47 +0800 Subject: [PATCH] Reopen ps cases --- .../pipeline/jit/compile_cache_manager.cc | 12 - .../runtime/graph_scheduler/actor/actor_set.h | 1 + .../graph_scheduler/rpc_node_scheduler.cc | 4 +- .../graph_scheduler/rpc_node_scheduler.h | 2 +- .../mindspore/dataset/engine/datasets.py | 6 +- .../st/frontend_compile_cache/run_lenet_ps.py | 2 + .../test_compile_cache.py | 6 +- .../ps/cmp_sparse_embedding/shell_run_test.sh | 45 ++- .../test_cmp_sparse_embedding.py | 12 +- .../test_entry_cmp_sparse_embedding.py | 19 +- tests/st/ps/full_ps/shell_run_test.sh | 45 ++- .../st/ps/full_ps/test_entry_full_ps_lenet.py | 21 +- tests/st/ps/multi_full_ps/entry.py | 35 --- tests/st/ps/multi_full_ps/resnet.py | 283 ------------------ tests/st/ps/multi_full_ps/shell_run_test.sh | 80 ----- .../st/ps/multi_full_ps/test_multi_full_ps.py | 118 -------- tests/st/ps/part_ps/shell_run_test.sh | 46 ++- ..._ps_embedding_heterogeneous_conv2d_adam.py | 19 +- ..._ps_embedding_heterogeneous_conv2d_adam.py | 2 +- 19 files changed, 166 insertions(+), 592 deletions(-) delete mode 100644 tests/st/ps/multi_full_ps/entry.py delete mode 100755 tests/st/ps/multi_full_ps/resnet.py delete mode 100644 tests/st/ps/multi_full_ps/shell_run_test.sh delete mode 100644 tests/st/ps/multi_full_ps/test_multi_full_ps.py diff --git a/mindspore/ccsrc/pipeline/jit/compile_cache_manager.cc b/mindspore/ccsrc/pipeline/jit/compile_cache_manager.cc index 9e89183b1ec..8d64c483900 100644 --- a/mindspore/ccsrc/pipeline/jit/compile_cache_manager.cc +++ b/mindspore/ccsrc/pipeline/jit/compile_cache_manager.cc @@ -31,7 +31,6 @@ #include "mindspore/core/utils/file_utils.h" #ifdef WITH_BACKEND -#include "ps/ps_context.h" #include "ps/core/node.h" #include "distributed/cluster/cluster_context.h" #endif @@ -71,17 +70,6 @@ std::string GetCompileCacheDir() { std::string GetRole() { #ifdef WITH_BACKEND - const std::string &server_mode = ps::PSContext::instance()->server_mode(); - if ((server_mode == ps::kServerModeFL || server_mode == ps::kServerModeHybrid) && - ps::PSContext::instance()->is_server()) { - return kRoleServer; - } - if (ps::PSContext::instance()->is_server()) { - return kRolePServer; - } - if (ps::PSContext::instance()->is_scheduler()) { - return kRolePScheduler; - } if (distributed::cluster::ClusterContext::instance()->initialized()) { auto node = distributed::cluster::ClusterContext::instance()->node(); MS_EXCEPTION_IF_NULL(node); diff --git a/mindspore/ccsrc/runtime/graph_scheduler/actor/actor_set.h b/mindspore/ccsrc/runtime/graph_scheduler/actor/actor_set.h index 00bfcb0e376..eff4b55654e 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/actor/actor_set.h +++ b/mindspore/ccsrc/runtime/graph_scheduler/actor/actor_set.h @@ -81,6 +81,7 @@ struct RpcActorSet { std::vector recv_actors_; }; using RpcActorSetPtr = std::shared_ptr; +using RpcActorSetWeakPtr = std::weak_ptr; #endif // The actor set generated by graph transformer is the execution unit of actor runtime. diff --git a/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.cc b/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.cc index 6c2ba803362..803193cd52e 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.cc +++ b/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.cc @@ -249,8 +249,8 @@ void RpcActorStatusUpdater::set_rpc_actors(const RpcActorSetPtr &rpc_actors) { void RpcActorStatusUpdater::UpdateRpcActorStatus() const { // To ensure performance, only mux recv actor need to update ready status for embedding cache mode currently. if (is_embedding_cache_server()) { - MS_EXCEPTION_IF_NULL(rpc_actors_); - for (auto &recv_actor : rpc_actors_->recv_actors_) { + MS_EXCEPTION_IF_NULL(rpc_actors_.lock()); + for (auto &recv_actor : rpc_actors_.lock()->recv_actors_) { MS_EXCEPTION_IF_NULL(recv_actor); recv_actor->UpdateStatus(); } diff --git a/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.h b/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.h index 851d20c3668..c2067d5a86a 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.h +++ b/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.h @@ -105,7 +105,7 @@ class RpcActorStatusUpdater { DISABLE_COPY_AND_ASSIGN(RpcActorStatusUpdater); // Record rpc actors which need to update status. - RpcActorSetPtr rpc_actors_; + RpcActorSetWeakPtr rpc_actors_; }; } // namespace runtime } // namespace mindspore diff --git a/mindspore/python/mindspore/dataset/engine/datasets.py b/mindspore/python/mindspore/dataset/engine/datasets.py index f7f2617213d..af64f938c68 100644 --- a/mindspore/python/mindspore/dataset/engine/datasets.py +++ b/mindspore/python/mindspore/dataset/engine/datasets.py @@ -1931,8 +1931,10 @@ class Dataset: """ # If this is in distributed execution mode, # the shard number and shard id might need to be updated according to the process's rank or role. - if _is_role_pserver() and _enable_distributed_mindrt(): - num_shards = _get_ps_context("worker_num") + worker_num = _get_ps_context("worker_num") + server_num = _get_ps_context("server_num") + if _is_role_pserver() and _enable_distributed_mindrt() and (worker_num != server_num): + num_shards = worker_num shard_id = 0 return num_shards, shard_id diff --git a/tests/st/frontend_compile_cache/run_lenet_ps.py b/tests/st/frontend_compile_cache/run_lenet_ps.py index 5e6ae38b35d..3e68df3aee1 100644 --- a/tests/st/frontend_compile_cache/run_lenet_ps.py +++ b/tests/st/frontend_compile_cache/run_lenet_ps.py @@ -27,6 +27,7 @@ from mindspore.nn.metrics import Accuracy from mindspore.train import Model from mindspore.train.callback import LossMonitor from mindspore.common.initializer import TruncatedNormal +from mindspore.communication.management import init DATASET_PATH = "/home/workspace/mindspore_dataset/mnist" context.set_context(mode=context.GRAPH_MODE, enable_compile_cache=True, compile_cache_path=sys.argv[1]) @@ -120,6 +121,7 @@ def create_dataset(data_path, batch_size=32, repeat_size=1, if __name__ == "__main__": + init() network = LeNet5(10) network.set_param_ps() net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean") diff --git a/tests/st/frontend_compile_cache/test_compile_cache.py b/tests/st/frontend_compile_cache/test_compile_cache.py index 2bb5fc5b620..726a00cadfb 100644 --- a/tests/st/frontend_compile_cache/test_compile_cache.py +++ b/tests/st/frontend_compile_cache/test_compile_cache.py @@ -192,9 +192,9 @@ def run_lenet_ps_twice(file_name, cache_path, log_file_name_first, log_file_name first_str_to_check = "Check the consistency of dependency files hash failed. Execute all the compilation actions." start_ps_subprocess(file_name, cache_path, first_str_to_check, log_file_name_first) assert os.path.exists(cache_path) - check_compile_cache_files(cache_path, "") - check_compile_cache_files(cache_path, "pserver_") - check_compile_cache_files(cache_path, "pscheduler_") + check_compile_cache_files(cache_path, "MS_WORKER") + check_compile_cache_files(cache_path, "MS_PSERVER") + check_compile_cache_files(cache_path, "MS_SCHED") # Second run os.environ['MS_SCHED_PORT'] = '8183' second_str_to_check = "Use the compilation cache and execute the backend actions only. Be aware of correctness" \ diff --git a/tests/st/ps/cmp_sparse_embedding/shell_run_test.sh b/tests/st/ps/cmp_sparse_embedding/shell_run_test.sh index 13d90f44384..5ac5c079e78 100644 --- a/tests/st/ps/cmp_sparse_embedding/shell_run_test.sh +++ b/tests/st/ps/cmp_sparse_embedding/shell_run_test.sh @@ -23,43 +23,58 @@ export MS_SERVER_NUM=$3 export MS_SCHED_HOST=$4 export MS_SCHED_PORT=$5 export MS_ROLE=MS_SCHED -for((i=0;i<1;i++)); -do - rm -rf ${execute_path}/sched_$i/ - mkdir ${execute_path}/sched_$i/ - cd ${execute_path}/sched_$i/ || exit - python ${self_path}/../test_cmp_sparse_embedding.py --device_target=$DEVICE_TARGET & -done + +rm -rf ${execute_path}/sched/ +mkdir ${execute_path}/sched/ +cd ${execute_path}/sched/ || exit +python ${self_path}/../test_cmp_sparse_embedding.py --device_target=$DEVICE_TARGET > sched.log 2>&1 & +sched_pid=`echo $!` export MS_ROLE=MS_PSERVER +server_pids=() for((i=0;i<$MS_SERVER_NUM;i++)); do rm -rf ${execute_path}/server_$i/ mkdir ${execute_path}/server_$i/ cd ${execute_path}/server_$i/ || exit - python ${self_path}/../test_cmp_sparse_embedding.py --device_target=$DEVICE_TARGET & + python ${self_path}/../test_cmp_sparse_embedding.py --device_target=$DEVICE_TARGET > server_$i.log 2>&1 & + server_pids[${i}]=`echo $!` done export MS_ROLE=MS_WORKER -process_pid=() +worker_pids=() for((i=0;i<$MS_WORKER_NUM;i++)); do rm -rf ${execute_path}/worker_$i/ mkdir ${execute_path}/worker_$i/ cd ${execute_path}/worker_$i/ || exit - python ${self_path}/../test_cmp_sparse_embedding.py --device_target=$DEVICE_TARGET & - process_pid[${i}]=`echo $!` + python ${self_path}/../test_cmp_sparse_embedding.py --device_target=$DEVICE_TARGET > worker_$i.log 2>&1 & + worker_pids[${i}]=`echo $!` done for((i=0; i<${MS_WORKER_NUM}; i++)); do - wait ${process_pid[i]} + wait ${worker_pids[i]} status=`echo $?` if [ "${status}" != "0" ]; then - echo "[ERROR] test_cmp_sparse_embedding failed. status: ${status}" + echo "[ERROR] test_cmp_sparse_embedding failed. Failed to wait worker_{$i}, status: ${status}" exit 1 - else - echo "[INFO] test_cmp_sparse_embedding success." fi done +for((i=0; i<${MS_SERVER_NUM}; i++)); do + wait ${server_pids[i]} + status=`echo $?` + if [ "${status}" != "0" ]; then + echo "[ERROR] test_cmp_sparse_embedding failed. Failed to wait server_{$i}, status: ${status}" + exit 1 + fi +done + +wait ${sched_pid} +status=`echo $?` +if [ "${status}" != "0" ]; then + echo "[ERROR] test_cmp_sparse_embedding failed. Failed to wait scheduler, status: ${status}" + exit 1 +fi + exit 0 diff --git a/tests/st/ps/cmp_sparse_embedding/test_cmp_sparse_embedding.py b/tests/st/ps/cmp_sparse_embedding/test_cmp_sparse_embedding.py index e624969b13d..f1e268fd350 100644 --- a/tests/st/ps/cmp_sparse_embedding/test_cmp_sparse_embedding.py +++ b/tests/st/ps/cmp_sparse_embedding/test_cmp_sparse_embedding.py @@ -25,7 +25,7 @@ from mindspore.nn import TrainOneStepCell, WithLossCell from mindspore.nn.optim import Adam from mindspore.common import set_seed from mindspore.ops import operations as P -from mindspore.parallel._ps_context import _is_role_worker +from mindspore.parallel._ps_context import _is_role_worker, _is_role_sched from mindspore.communication.management import init parser = argparse.ArgumentParser(description="test_sparse_embedding") @@ -56,7 +56,7 @@ class LeNet5(nn.Cell): def do_sparse_embedding(ps=False): - epoch = 10 + epoch = 5 net = LeNet5(10) if ps: net.embedding.embedding_table.set_param_ps() @@ -71,9 +71,11 @@ def do_sparse_embedding(ps=False): for _ in range(epoch): data = Tensor(np.random.randint(-5, 15, (32, 3), np.int32)) label = Tensor(np.random.randint(0, 9, (32), np.int32)) - loss = train_network(data, label).asnumpy() - losses.append(loss) - print(losses) + loss = train_network(data, label) + if _is_role_sched(): + return None + losses.append(loss.asnumpy()) + print("Do sprase embedding losses:", losses) return losses diff --git a/tests/st/ps/cmp_sparse_embedding/test_entry_cmp_sparse_embedding.py b/tests/st/ps/cmp_sparse_embedding/test_entry_cmp_sparse_embedding.py index 8659cecab48..9b6e24a57d2 100644 --- a/tests/st/ps/cmp_sparse_embedding/test_entry_cmp_sparse_embedding.py +++ b/tests/st/ps/cmp_sparse_embedding/test_entry_cmp_sparse_embedding.py @@ -13,8 +13,25 @@ # limitations under the License. # ============================================================================ import os +import pytest +@pytest.mark.level0 +@pytest.mark.platform_x86_ascend_training +@pytest.mark.platform_arm_ascend_training +@pytest.mark.env_single def test_cmp_sparse_embedding(): - return_code = os.system("bash shell_run_test.sh GPU 1 1 127.0.0.1 8081") + """ + Feature: Parameter Server. + Description: Test sparse optimizer for ps. + Expectation: success. + """ + return_code = os.system("bash shell_run_test.sh Ascend 1 1 127.0.0.1 8081") + if return_code != 0: + os.system(f"echo '\n**************** Worker Log ****************'") + os.system(f"grep -E 'ERROR|Error|error' ./worker*/worker*.log") + os.system(f"echo '\n**************** Server Log ****************'") + os.system(f"grep -E 'ERROR|Error|error' ./server*/server*.log") + os.system(f"echo '\n**************** Scheduler Log ****************'") + os.system(f"grep -E 'ERROR|Error|error' ./sched/sched.log") assert return_code == 0 diff --git a/tests/st/ps/full_ps/shell_run_test.sh b/tests/st/ps/full_ps/shell_run_test.sh index 51869dbae82..a550c0053d6 100644 --- a/tests/st/ps/full_ps/shell_run_test.sh +++ b/tests/st/ps/full_ps/shell_run_test.sh @@ -25,43 +25,58 @@ export MS_SCHED_HOST=$5 export MS_SCHED_PORT=$6 export MS_ROLE=MS_SCHED export fusion=True -for((i=0;i<1;i++)); -do - rm -rf ${execute_path}/sched_$i/ - mkdir ${execute_path}/sched_$i/ - cd ${execute_path}/sched_$i/ || exit - python ${self_path}/../test_full_ps_lenet.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH & -done + +rm -rf ${execute_path}/sched/ +mkdir ${execute_path}/sched/ +cd ${execute_path}/sched/ || exit +python ${self_path}/../test_full_ps_lenet.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH > sched.log 2>&1 & +sched_pid=`echo $!` export MS_ROLE=MS_PSERVER +server_pids=() for((i=0;i<$MS_SERVER_NUM;i++)); do rm -rf ${execute_path}/server_$i/ mkdir ${execute_path}/server_$i/ cd ${execute_path}/server_$i/ || exit - python ${self_path}/../test_full_ps_lenet.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH & + python ${self_path}/../test_full_ps_lenet.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH > server_$i.log 2>&1 & + server_pids[${i}]=`echo $!` done export MS_ROLE=MS_WORKER -process_pid=() +worker_pids=() for((i=0;i<$MS_WORKER_NUM;i++)); do rm -rf ${execute_path}/worker_$i/ mkdir ${execute_path}/worker_$i/ cd ${execute_path}/worker_$i/ || exit - python ${self_path}/../test_full_ps_lenet.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH & - process_pid[${i}]=`echo $!` + python ${self_path}/../test_full_ps_lenet.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH > worker_$i.log 2>&1 & + worker_pids[${i}]=`echo $!` done for((i=0; i<${MS_WORKER_NUM}; i++)); do - wait ${process_pid[i]} + wait ${worker_pids[i]} status=`echo $?` if [ "${status}" != "0" ]; then - echo "[ERROR] test_full_ps_lenet failed. status: ${status}" + echo "[ERROR] test_full_ps_lenet failed. Failed to wait worker_{$i}, status: ${status}" exit 1 - else - echo "[INFO] test_full_ps_lenet success." fi done +for((i=0; i<${MS_SERVER_NUM}; i++)); do + wait ${server_pids[i]} + status=`echo $?` + if [ "${status}" != "0" ]; then + echo "[ERROR] test_full_ps_lenet failed. Failed to wait server_{$i}, status: ${status}" + exit 1 + fi +done + +wait ${sched_pid} +status=`echo $?` +if [ "${status}" != "0" ]; then + echo "[ERROR] test_full_ps_lenet failed. Failed to wait scheduler, status: ${status}" + exit 1 +fi + exit 0 diff --git a/tests/st/ps/full_ps/test_entry_full_ps_lenet.py b/tests/st/ps/full_ps/test_entry_full_ps_lenet.py index a2f7426701c..343f39d8aba 100644 --- a/tests/st/ps/full_ps/test_entry_full_ps_lenet.py +++ b/tests/st/ps/full_ps/test_entry_full_ps_lenet.py @@ -13,10 +13,27 @@ # limitations under the License. # ============================================================================ import os +import pytest -def test_full_ps_ascend_lenet(): +@pytest.mark.level0 +@pytest.mark.platform_x86_ascend_training +@pytest.mark.platform_arm_ascend_training +@pytest.mark.env_single +def test_full_ps_lenet(): + """ + Feature: Parameter Server. + Description: Test LeNet accuracy in ps mode. + Expectation: success. + """ return_code = os.system( - "bash shell_run_test.sh GPU /home/workspace/mindspore_dataset/mnist 1 1 127.0.0.1 8082" + "bash shell_run_test.sh Ascend /home/workspace/mindspore_dataset/mnist 1 1 127.0.0.1 8082" ) + if return_code != 0: + os.system(f"echo '\n**************** Worker Log ****************'") + os.system(f"grep -E 'ERROR|Error|error' ./worker*/worker*.log") + os.system(f"echo '\n**************** Server Log ****************'") + os.system(f"grep -E 'ERROR|Error|error' ./server*/server*.log") + os.system(f"echo '\n**************** Scheduler Log ****************'") + os.system(f"grep -E 'ERROR|Error|error' ./sched/sched.log") assert return_code == 0 diff --git a/tests/st/ps/multi_full_ps/entry.py b/tests/st/ps/multi_full_ps/entry.py deleted file mode 100644 index d605e85dc41..00000000000 --- a/tests/st/ps/multi_full_ps/entry.py +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright 2020 Huawei Technologies Co., Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================ -import os - - -def test_ps_ascend_multi_worker_multi_server(): - return_code = os.system("bash shell_run_test.sh Ascend 8 8 127.0.0.1 8088") - assert return_code == 0 - - -def test_ps_ascend(): - return_code = os.system("bash shell_run_test.sh Ascend 1 1 127.0.0.1 8088") - assert return_code == 0 - - -def test_ps_gpu_multi_worker_multi_server(): - return_code = os.system("bash shell_run_test.sh GPU 8 8 127.0.0.1 8088") - assert return_code == 0 - - -def test_ps_gpu(): - return_code = os.system("bash shell_run_test.sh GPU 1 1 127.0.0.1 8088") - assert return_code == 0 diff --git a/tests/st/ps/multi_full_ps/resnet.py b/tests/st/ps/multi_full_ps/resnet.py deleted file mode 100755 index a7961c3e325..00000000000 --- a/tests/st/ps/multi_full_ps/resnet.py +++ /dev/null @@ -1,283 +0,0 @@ -# Copyright 2020 Huawei Technologies Co., Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================ -"""ResNet.""" -import numpy as np -import mindspore.nn as nn -from mindspore.ops import operations as P -from mindspore.common.tensor import Tensor - - -def _weight_variable(shape, factor=0.01): - init_value = np.random.randn(*shape).astype(np.float32) * factor - return Tensor(init_value) - - -def _conv3x3(in_channel, out_channel, stride=1): - weight_shape = (out_channel, in_channel, 3, 3) - weight = _weight_variable(weight_shape) - return nn.Conv2d(in_channel, out_channel, - kernel_size=3, stride=stride, padding=0, pad_mode='same', weight_init=weight) - - -def _conv1x1(in_channel, out_channel, stride=1): - weight_shape = (out_channel, in_channel, 1, 1) - weight = _weight_variable(weight_shape) - return nn.Conv2d(in_channel, out_channel, - kernel_size=1, stride=stride, padding=0, pad_mode='same', weight_init=weight) - - -def _conv7x7(in_channel, out_channel, stride=1): - weight_shape = (out_channel, in_channel, 7, 7) - weight = _weight_variable(weight_shape) - return nn.Conv2d(in_channel, out_channel, - kernel_size=7, stride=stride, padding=0, pad_mode='same', weight_init=weight) - - -def _bn(channel): - return nn.BatchNorm2d(channel, eps=1e-4, momentum=0.9, - gamma_init=1, beta_init=0, moving_mean_init=0, moving_var_init=1) - - -def _bn_last(channel): - return nn.BatchNorm2d(channel, eps=1e-4, momentum=0.9, - gamma_init=0, beta_init=0, moving_mean_init=0, moving_var_init=1) - - -def _fc(in_channel, out_channel): - weight_shape = (out_channel, in_channel) - weight = _weight_variable(weight_shape) - return nn.Dense(in_channel, out_channel, has_bias=True, weight_init=weight, bias_init=0) - - -class ResidualBlock(nn.Cell): - """ - ResNet V1 residual block definition. - - Args: - in_channel (int): Input channel. - out_channel (int): Output channel. - stride (int): Stride size for the first convolutional layer. Default: 1. - - Returns: - Tensor, output tensor. - - Examples: - >>> ResidualBlock(3, 256, stride=2) - """ - expansion = 4 - - def __init__(self, - in_channel, - out_channel, - stride=1): - super(ResidualBlock, self).__init__() - - channel = out_channel // self.expansion - self.conv1 = _conv1x1(in_channel, channel, stride=1) - self.bn1 = _bn(channel) - - self.conv2 = _conv3x3(channel, channel, stride=stride) - self.bn2 = _bn(channel) - - self.conv3 = _conv1x1(channel, out_channel, stride=1) - self.bn3 = _bn_last(out_channel) - - self.relu = nn.ReLU() - - self.down_sample = False - - if stride != 1 or in_channel != out_channel: - self.down_sample = True - self.down_sample_layer = None - - if self.down_sample: - self.down_sample_layer = nn.SequentialCell([_conv1x1(in_channel, out_channel, stride), - _bn(out_channel)]) - self.add = P.Add() - - def construct(self, x): - identity = x - - out = self.conv1(x) - out = self.bn1(out) - out = self.relu(out) - - out = self.conv2(out) - out = self.bn2(out) - out = self.relu(out) - - out = self.conv3(out) - out = self.bn3(out) - - if self.down_sample: - identity = self.down_sample_layer(identity) - - out = self.add(out, identity) - out = self.relu(out) - - return out - - -class ResNet(nn.Cell): - """ - ResNet architecture. - - Args: - block (Cell): Block for network. - layer_nums (list): Numbers of block in different layers. - in_channels (list): Input channel in each layer. - out_channels (list): Output channel in each layer. - strides (list): Stride size in each layer. - num_classes (int): The number of classes that the training images are belonging to. - Returns: - Tensor, output tensor. - - Examples: - >>> ResNet(ResidualBlock, - >>> [3, 4, 6, 3], - >>> [64, 256, 512, 1024], - >>> [256, 512, 1024, 2048], - >>> [1, 2, 2, 2], - >>> 10) - """ - - def __init__(self, - block, - layer_nums, - in_channels, - out_channels, - strides, - num_classes): - super(ResNet, self).__init__() - - if not len(layer_nums) == len(in_channels) == len(out_channels) == 4: - raise ValueError("the length of layer_num, in_channels, out_channels list must be 4!") - - self.conv1 = _conv7x7(3, 64, stride=2) - self.bn1 = _bn(64) - self.relu = P.ReLU() - self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode="same") - - self.layer1 = self._make_layer(block, - layer_nums[0], - in_channel=in_channels[0], - out_channel=out_channels[0], - stride=strides[0]) - self.layer2 = self._make_layer(block, - layer_nums[1], - in_channel=in_channels[1], - out_channel=out_channels[1], - stride=strides[1]) - self.layer3 = self._make_layer(block, - layer_nums[2], - in_channel=in_channels[2], - out_channel=out_channels[2], - stride=strides[2]) - self.layer4 = self._make_layer(block, - layer_nums[3], - in_channel=in_channels[3], - out_channel=out_channels[3], - stride=strides[3]) - - self.mean = P.ReduceMean(keep_dims=True) - self.flatten = nn.Flatten() - self.end_point = _fc(out_channels[3], num_classes) - - def _make_layer(self, block, layer_num, in_channel, out_channel, stride): - """ - Make stage network of ResNet. - - Args: - block (Cell): Resnet block. - layer_num (int): Layer number. - in_channel (int): Input channel. - out_channel (int): Output channel. - stride (int): Stride size for the first convolutional layer. - - Returns: - SequentialCell, the output layer. - - Examples: - >>> _make_layer(ResidualBlock, 3, 128, 256, 2) - """ - layers = [] - - resnet_block = block(in_channel, out_channel, stride=stride) - layers.append(resnet_block) - - for _ in range(1, layer_num): - resnet_block = block(out_channel, out_channel, stride=1) - layers.append(resnet_block) - - return nn.SequentialCell(layers) - - def construct(self, x): - x = self.conv1(x) - x = self.bn1(x) - x = self.relu(x) - c1 = self.maxpool(x) - - c2 = self.layer1(c1) - c3 = self.layer2(c2) - c4 = self.layer3(c3) - c5 = self.layer4(c4) - - out = self.mean(c5, (2, 3)) - out = self.flatten(out) - out = self.end_point(out) - - return out - - -def resnet50(class_num=10): - """ - Get ResNet50 neural network. - - Args: - class_num (int): Class number. - - Returns: - Cell, cell instance of ResNet50 neural network. - - Examples: - >>> net = resnet50(10) - """ - return ResNet(ResidualBlock, - [3, 4, 6, 3], - [64, 256, 512, 1024], - [256, 512, 1024, 2048], - [1, 2, 2, 2], - class_num) - - -def resnet101(class_num=1001): - """ - Get ResNet101 neural network. - - Args: - class_num (int): Class number. - - Returns: - Cell, cell instance of ResNet101 neural network. - - Examples: - >>> net = resnet101(1001) - """ - return ResNet(ResidualBlock, - [3, 4, 23, 3], - [64, 256, 512, 1024], - [256, 512, 1024, 2048], - [1, 2, 2, 2], - class_num) diff --git a/tests/st/ps/multi_full_ps/shell_run_test.sh b/tests/st/ps/multi_full_ps/shell_run_test.sh deleted file mode 100644 index f4979cfd1f6..00000000000 --- a/tests/st/ps/multi_full_ps/shell_run_test.sh +++ /dev/null @@ -1,80 +0,0 @@ -#!/bin/bash -# Copyright 2020 Huawei Technologies Co., Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================ - -execute_path=$(pwd) -self_path=$(dirname $0) -export MS_SCHED_NUM=1 -DEVICE_TARGET=$1 -export MS_WORKER_NUM=$2 -export MS_SERVER_NUM=$3 -export MS_SCHED_HOST=$4 -export MS_SCHED_PORT=$5 - -export MS_ROLE=MS_SCHED -for((i=0;i<1;i++)); -do - rm -rf ${execute_path}/sched_$i/ - mkdir ${execute_path}/sched_$i/ - cd ${execute_path}/sched_$i/ || exit - python ${self_path}/../test_multi_full_ps.py --device_target=$DEVICE_TARGET & -done - -export MS_ROLE=MS_PSERVER -for((i=0;i<$MS_SERVER_NUM;i++)); -do - rm -rf ${execute_path}/server_$i/ - mkdir ${execute_path}/server_$i/ - cd ${execute_path}/server_$i/ || exit - export RANK_ID=$i - export DEVICE_ID=$i - python ${self_path}/../test_multi_full_ps.py --device_target=$DEVICE_TARGET & -done - -export MS_ROLE=MS_WORKER -process_pid=() -if [ $DEVICE_TARGET == "Ascend" ];then -for((i=0;i<$MS_WORKER_NUM;i++)); -do - rm -rf ${execute_path}/worker_$i/ - mkdir ${execute_path}/worker_$i/ - cd ${execute_path}/worker_$i/ || exit - export RANK_ID=$i - export DEVICE_ID=$i - python ${self_path}/../test_multi_full_ps.py --device_target=$DEVICE_TARGET & - process_pid[${i}]=`echo $!` -done - -for((i=0; i<${MS_WORKER_NUM}; i++)); do - wait ${process_pid[i]} - status=`echo $?` - if [ "${status}" != "0" ]; then - echo "[ERROR] test_multi_full_ps failed. status: ${status}" - exit 1 - else - echo "[INFO] test_multi_full_ps success." - fi -done -fi - -if [ $DEVICE_TARGET == "GPU" ];then - rm -rf ${execute_path}/worker/ - mkdir ${execute_path}/worker/ - cd ${execute_path}/worker/ || exit - mpirun -n $MS_WORKER_NUM python ${self_path}/../test_multi_full_ps.py --device_target=$DEVICE_TARGET & - wait $! -fi - -exit 0 diff --git a/tests/st/ps/multi_full_ps/test_multi_full_ps.py b/tests/st/ps/multi_full_ps/test_multi_full_ps.py deleted file mode 100644 index ae5437417ad..00000000000 --- a/tests/st/ps/multi_full_ps/test_multi_full_ps.py +++ /dev/null @@ -1,118 +0,0 @@ -# Copyright 2020 Huawei Technologies Co., Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================ - -import sys -import argparse -import numpy as np - -import mindspore.context as context -import mindspore.nn as nn -from mindspore.common.initializer import TruncatedNormal -from mindspore import Tensor -from mindspore.nn import TrainOneStepCell, WithLossCell -from mindspore.communication.management import init, get_group_size -from mindspore.parallel._ps_context import _is_role_pserver -# from resnet import resnet50 - -parser = argparse.ArgumentParser(description="test_ps_lenet") -parser.add_argument("--device_target", type=str, default="Ascend") -args, _ = parser.parse_known_args() -device_target = args.device_target -context.set_context(mode=context.GRAPH_MODE, device_target=device_target) -context.set_ps_context(enable_ps=True) -if device_target == "GPU": - init() - - -def conv(in_channels, out_channels, kernel_size, stride=1, padding=0): - """weight initial for conv layer""" - weight = weight_variable() - return nn.Conv2d( - in_channels, - out_channels, - kernel_size=kernel_size, - stride=stride, - padding=padding, - weight_init=weight, - has_bias=False, - pad_mode="valid", - ) - - -def fc_with_initialize(input_channels, out_channels): - """weight initial for fc layer""" - weight = weight_variable() - bias = weight_variable() - return nn.Dense(input_channels, out_channels, weight, bias) - - -def weight_variable(): - """weight initial""" - return TruncatedNormal(0.02) - - -class LeNet5(nn.Cell): - def __init__(self, num_class=10, channel=3): - super(LeNet5, self).__init__() - self.num_class = num_class - self.conv1 = conv(channel, 6, 5) - self.conv2 = conv(6, 16, 5) - self.fc1 = fc_with_initialize(16 * 5 * 5, 120) - self.fc2 = fc_with_initialize(120, 84) - self.fc3 = fc_with_initialize(84, self.num_class) - self.relu = nn.ReLU() - self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2) - self.flatten = nn.Flatten() - - def construct(self, x): - x = self.conv1(x) - x = self.relu(x) - x = self.max_pool2d(x) - x = self.conv2(x) - x = self.relu(x) - x = self.max_pool2d(x) - x = self.flatten(x) - x = self.fc1(x) - x = self.relu(x) - x = self.fc2(x) - x = self.relu(x) - x = self.fc3(x) - return x - - -if __name__ == "__main__": - epoch = 5 - np.random.seed(0) - network = LeNet5(10) - network.set_param_ps() - criterion = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean") - net_opt = nn.Momentum(network.trainable_params(), 0.01, 0.9) - if device_target == "GPU": - context.set_auto_parallel_context(parallel_mode="data_parallel", gradients_mean=True, - device_num=get_group_size()) - net_with_criterion = WithLossCell(network, criterion) - train_network = TrainOneStepCell(net_with_criterion, net_opt) - train_network.set_train() - losses = [] - for _ in range(epoch): - data = Tensor(np.random.rand(32, 3, 32, 32).astype(np.float32)) - label = Tensor(np.random.randint(0, 9, (32)).astype(np.int32)) - if _is_role_pserver(): - train_network(data, label) - sys.exit() - else: - loss = train_network(data, label).asnumpy() - losses.append(loss) - print(losses) diff --git a/tests/st/ps/part_ps/shell_run_test.sh b/tests/st/ps/part_ps/shell_run_test.sh index 9b59bb1e934..b1599a0b6a3 100644 --- a/tests/st/ps/part_ps/shell_run_test.sh +++ b/tests/st/ps/part_ps/shell_run_test.sh @@ -24,45 +24,59 @@ export MS_SERVER_NUM=$4 export MS_SCHED_HOST=$5 export MS_SCHED_PORT=$6 export MS_ROLE=MS_SCHED -export fusion=True -for((i=0;i<1;i++)); -do - rm -rf ${execute_path}/sched_$i/ - mkdir ${execute_path}/sched_$i/ - cd ${execute_path}/sched_$i/ || exit - python ${self_path}/../test_ps_embedding_heterogeneous_conv2d_adam.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH & -done + +rm -rf ${execute_path}/sched/ +mkdir ${execute_path}/sched/ +cd ${execute_path}/sched/ || exit +python ${self_path}/../test_ps_embedding_heterogeneous_conv2d_adam.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH > sched.log 2>&1 & +sched_pid=`echo $!` export MS_ROLE=MS_PSERVER +server_pids=() for((i=0;i<$MS_SERVER_NUM;i++)); do rm -rf ${execute_path}/server_$i/ mkdir ${execute_path}/server_$i/ cd ${execute_path}/server_$i/ || exit - python ${self_path}/../test_ps_embedding_heterogeneous_conv2d_adam.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH & + python ${self_path}/../test_ps_embedding_heterogeneous_conv2d_adam.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH > server_$i.log 2>&1 & + server_pids[${i}]=`echo $!` done export MS_ROLE=MS_WORKER -process_pid=() +worker_pids=() for((i=0;i<$MS_WORKER_NUM;i++)); do rm -rf ${execute_path}/worker_$i/ mkdir ${execute_path}/worker_$i/ cd ${execute_path}/worker_$i/ || exit - python ${self_path}/../test_ps_embedding_heterogeneous_conv2d_adam.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH & - process_pid[${i}]=`echo $!` + python ${self_path}/../test_ps_embedding_heterogeneous_conv2d_adam.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH > worker_$i.log 2>&1 & + worker_pids[${i}]=`echo $!` done for((i=0; i<${MS_WORKER_NUM}; i++)); do - wait ${process_pid[i]} + wait ${worker_pids[i]} status=`echo $?` if [ "${status}" != "0" ]; then - echo "[ERROR] test_ps_embedding_heterogeneous_conv2d_adam failed. status: ${status}" + echo "[ERROR] test_ps_embedding_heterogeneous_conv2d_adam failed. Failed to wait worker_{$i}, status: ${status}" exit 1 - else - echo "[INFO] test_ps_embedding_heterogeneous_conv2d_adam success." fi done +for((i=0; i<${MS_SERVER_NUM}; i++)); do + wait ${server_pids[i]} + status=`echo $?` + if [ "${status}" != "0" ]; then + echo "[ERROR] test_ps_embedding_heterogeneous_conv2d_adam failed. Failed to wait server_{$i}, status: ${status}" + exit 1 + fi +done + +wait ${sched_pid} +status=`echo $?` +if [ "${status}" != "0" ]; then + echo "[ERROR] test_ps_embedding_heterogeneous_conv2d_adam failed. Failed to wait scheduler, status: ${status}" + exit 1 +fi + exit 0 diff --git a/tests/st/ps/part_ps/test_entry_ps_embedding_heterogeneous_conv2d_adam.py b/tests/st/ps/part_ps/test_entry_ps_embedding_heterogeneous_conv2d_adam.py index 84faa87aa71..07ac7c10153 100644 --- a/tests/st/ps/part_ps/test_entry_ps_embedding_heterogeneous_conv2d_adam.py +++ b/tests/st/ps/part_ps/test_entry_ps_embedding_heterogeneous_conv2d_adam.py @@ -13,10 +13,27 @@ # limitations under the License. # ============================================================================ import os +import pytest +@pytest.mark.level0 +@pytest.mark.platform_x86_ascend_training +@pytest.mark.platform_arm_ascend_training +@pytest.mark.env_single def test_ps_embedding_heterogeneous_conv2d_adam(): + """ + Feature: Parameter Server. + Description: Test embedding with heterogeneous conv2d. + Expectation: success. + """ return_code = os.system( - "bash shell_run_test.sh GPU /home/workspace/mindspore_dataset/mnist 1 1 127.0.0.1 8085" + "bash shell_run_test.sh Ascend /home/workspace/mindspore_dataset/mnist 1 1 127.0.0.1 8085" ) + if return_code != 0: + os.system(f"echo '\n**************** Worker Log ****************'") + os.system(f"grep -E 'ERROR|Error|error' ./worker*/worker*.log") + os.system(f"echo '\n**************** Server Log ****************'") + os.system(f"grep -E 'ERROR|Error|error' ./server*/server*.log") + os.system(f"echo '\n**************** Scheduler Log ****************'") + os.system(f"grep -E 'ERROR|Error|error' ./sched/sched.log") assert return_code == 0 diff --git a/tests/st/ps/part_ps/test_ps_embedding_heterogeneous_conv2d_adam.py b/tests/st/ps/part_ps/test_ps_embedding_heterogeneous_conv2d_adam.py index e290fcc3bab..9fb5814745d 100644 --- a/tests/st/ps/part_ps/test_ps_embedding_heterogeneous_conv2d_adam.py +++ b/tests/st/ps/part_ps/test_ps_embedding_heterogeneous_conv2d_adam.py @@ -85,7 +85,7 @@ def create_dataset(data_path, batch_size=32, repeat_size=1, create dataset for train or test """ # define dataset - mnist_ds = ds.MnistDataset(data_path) + mnist_ds = ds.MnistDataset(data_path, num_shards=512, shard_id=0) resize_height, resize_width = 32, 32 rescale = 1.0 / 255.0