!40591 Reopen ps cases

Merge pull request !40591 from ZPaC/reopen-ps-cases
This commit is contained in:
i-robot 2022-08-22 04:37:03 +00:00 committed by Gitee
commit 7fafa69610
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
19 changed files with 166 additions and 592 deletions

View File

@ -31,7 +31,6 @@
#include "mindspore/core/utils/file_utils.h"
#ifdef WITH_BACKEND
#include "ps/ps_context.h"
#include "ps/core/node.h"
#include "distributed/cluster/cluster_context.h"
#endif
@ -71,17 +70,6 @@ std::string GetCompileCacheDir() {
std::string GetRole() {
#ifdef WITH_BACKEND
const std::string &server_mode = ps::PSContext::instance()->server_mode();
if ((server_mode == ps::kServerModeFL || server_mode == ps::kServerModeHybrid) &&
ps::PSContext::instance()->is_server()) {
return kRoleServer;
}
if (ps::PSContext::instance()->is_server()) {
return kRolePServer;
}
if (ps::PSContext::instance()->is_scheduler()) {
return kRolePScheduler;
}
if (distributed::cluster::ClusterContext::instance()->initialized()) {
auto node = distributed::cluster::ClusterContext::instance()->node();
MS_EXCEPTION_IF_NULL(node);

View File

@ -81,6 +81,7 @@ struct RpcActorSet {
std::vector<RecvActorPtr> recv_actors_;
};
using RpcActorSetPtr = std::shared_ptr<RpcActorSet>;
using RpcActorSetWeakPtr = std::weak_ptr<RpcActorSet>;
#endif
// The actor set generated by graph transformer is the execution unit of actor runtime.

View File

@ -249,8 +249,8 @@ void RpcActorStatusUpdater::set_rpc_actors(const RpcActorSetPtr &rpc_actors) {
void RpcActorStatusUpdater::UpdateRpcActorStatus() const {
// To ensure performance, only mux recv actor need to update ready status for embedding cache mode currently.
if (is_embedding_cache_server()) {
MS_EXCEPTION_IF_NULL(rpc_actors_);
for (auto &recv_actor : rpc_actors_->recv_actors_) {
MS_EXCEPTION_IF_NULL(rpc_actors_.lock());
for (auto &recv_actor : rpc_actors_.lock()->recv_actors_) {
MS_EXCEPTION_IF_NULL(recv_actor);
recv_actor->UpdateStatus();
}

View File

@ -105,7 +105,7 @@ class RpcActorStatusUpdater {
DISABLE_COPY_AND_ASSIGN(RpcActorStatusUpdater);
// Record rpc actors which need to update status.
RpcActorSetPtr rpc_actors_;
RpcActorSetWeakPtr rpc_actors_;
};
} // namespace runtime
} // namespace mindspore

View File

@ -1931,8 +1931,10 @@ class Dataset:
"""
# If this is in distributed execution mode,
# the shard number and shard id might need to be updated according to the process's rank or role.
if _is_role_pserver() and _enable_distributed_mindrt():
num_shards = _get_ps_context("worker_num")
worker_num = _get_ps_context("worker_num")
server_num = _get_ps_context("server_num")
if _is_role_pserver() and _enable_distributed_mindrt() and (worker_num != server_num):
num_shards = worker_num
shard_id = 0
return num_shards, shard_id

View File

@ -27,6 +27,7 @@ from mindspore.nn.metrics import Accuracy
from mindspore.train import Model
from mindspore.train.callback import LossMonitor
from mindspore.common.initializer import TruncatedNormal
from mindspore.communication.management import init
DATASET_PATH = "/home/workspace/mindspore_dataset/mnist"
context.set_context(mode=context.GRAPH_MODE, enable_compile_cache=True, compile_cache_path=sys.argv[1])
@ -120,6 +121,7 @@ def create_dataset(data_path, batch_size=32, repeat_size=1,
if __name__ == "__main__":
init()
network = LeNet5(10)
network.set_param_ps()
net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean")

View File

@ -192,9 +192,9 @@ def run_lenet_ps_twice(file_name, cache_path, log_file_name_first, log_file_name
first_str_to_check = "Check the consistency of dependency files hash failed. Execute all the compilation actions."
start_ps_subprocess(file_name, cache_path, first_str_to_check, log_file_name_first)
assert os.path.exists(cache_path)
check_compile_cache_files(cache_path, "")
check_compile_cache_files(cache_path, "pserver_")
check_compile_cache_files(cache_path, "pscheduler_")
check_compile_cache_files(cache_path, "MS_WORKER")
check_compile_cache_files(cache_path, "MS_PSERVER")
check_compile_cache_files(cache_path, "MS_SCHED")
# Second run
os.environ['MS_SCHED_PORT'] = '8183'
second_str_to_check = "Use the compilation cache and execute the backend actions only. Be aware of correctness" \

View File

@ -23,43 +23,58 @@ export MS_SERVER_NUM=$3
export MS_SCHED_HOST=$4
export MS_SCHED_PORT=$5
export MS_ROLE=MS_SCHED
for((i=0;i<1;i++));
do
rm -rf ${execute_path}/sched_$i/
mkdir ${execute_path}/sched_$i/
cd ${execute_path}/sched_$i/ || exit
python ${self_path}/../test_cmp_sparse_embedding.py --device_target=$DEVICE_TARGET &
done
rm -rf ${execute_path}/sched/
mkdir ${execute_path}/sched/
cd ${execute_path}/sched/ || exit
python ${self_path}/../test_cmp_sparse_embedding.py --device_target=$DEVICE_TARGET > sched.log 2>&1 &
sched_pid=`echo $!`
export MS_ROLE=MS_PSERVER
server_pids=()
for((i=0;i<$MS_SERVER_NUM;i++));
do
rm -rf ${execute_path}/server_$i/
mkdir ${execute_path}/server_$i/
cd ${execute_path}/server_$i/ || exit
python ${self_path}/../test_cmp_sparse_embedding.py --device_target=$DEVICE_TARGET &
python ${self_path}/../test_cmp_sparse_embedding.py --device_target=$DEVICE_TARGET > server_$i.log 2>&1 &
server_pids[${i}]=`echo $!`
done
export MS_ROLE=MS_WORKER
process_pid=()
worker_pids=()
for((i=0;i<$MS_WORKER_NUM;i++));
do
rm -rf ${execute_path}/worker_$i/
mkdir ${execute_path}/worker_$i/
cd ${execute_path}/worker_$i/ || exit
python ${self_path}/../test_cmp_sparse_embedding.py --device_target=$DEVICE_TARGET &
process_pid[${i}]=`echo $!`
python ${self_path}/../test_cmp_sparse_embedding.py --device_target=$DEVICE_TARGET > worker_$i.log 2>&1 &
worker_pids[${i}]=`echo $!`
done
for((i=0; i<${MS_WORKER_NUM}; i++)); do
wait ${process_pid[i]}
wait ${worker_pids[i]}
status=`echo $?`
if [ "${status}" != "0" ]; then
echo "[ERROR] test_cmp_sparse_embedding failed. status: ${status}"
echo "[ERROR] test_cmp_sparse_embedding failed. Failed to wait worker_{$i}, status: ${status}"
exit 1
else
echo "[INFO] test_cmp_sparse_embedding success."
fi
done
for((i=0; i<${MS_SERVER_NUM}; i++)); do
wait ${server_pids[i]}
status=`echo $?`
if [ "${status}" != "0" ]; then
echo "[ERROR] test_cmp_sparse_embedding failed. Failed to wait server_{$i}, status: ${status}"
exit 1
fi
done
wait ${sched_pid}
status=`echo $?`
if [ "${status}" != "0" ]; then
echo "[ERROR] test_cmp_sparse_embedding failed. Failed to wait scheduler, status: ${status}"
exit 1
fi
exit 0

View File

@ -25,7 +25,7 @@ from mindspore.nn import TrainOneStepCell, WithLossCell
from mindspore.nn.optim import Adam
from mindspore.common import set_seed
from mindspore.ops import operations as P
from mindspore.parallel._ps_context import _is_role_worker
from mindspore.parallel._ps_context import _is_role_worker, _is_role_sched
from mindspore.communication.management import init
parser = argparse.ArgumentParser(description="test_sparse_embedding")
@ -56,7 +56,7 @@ class LeNet5(nn.Cell):
def do_sparse_embedding(ps=False):
epoch = 10
epoch = 5
net = LeNet5(10)
if ps:
net.embedding.embedding_table.set_param_ps()
@ -71,9 +71,11 @@ def do_sparse_embedding(ps=False):
for _ in range(epoch):
data = Tensor(np.random.randint(-5, 15, (32, 3), np.int32))
label = Tensor(np.random.randint(0, 9, (32), np.int32))
loss = train_network(data, label).asnumpy()
losses.append(loss)
print(losses)
loss = train_network(data, label)
if _is_role_sched():
return None
losses.append(loss.asnumpy())
print("Do sprase embedding losses:", losses)
return losses

View File

@ -13,8 +13,25 @@
# limitations under the License.
# ============================================================================
import os
import pytest
@pytest.mark.level0
@pytest.mark.platform_x86_ascend_training
@pytest.mark.platform_arm_ascend_training
@pytest.mark.env_single
def test_cmp_sparse_embedding():
return_code = os.system("bash shell_run_test.sh GPU 1 1 127.0.0.1 8081")
"""
Feature: Parameter Server.
Description: Test sparse optimizer for ps.
Expectation: success.
"""
return_code = os.system("bash shell_run_test.sh Ascend 1 1 127.0.0.1 8081")
if return_code != 0:
os.system(f"echo '\n**************** Worker Log ****************'")
os.system(f"grep -E 'ERROR|Error|error' ./worker*/worker*.log")
os.system(f"echo '\n**************** Server Log ****************'")
os.system(f"grep -E 'ERROR|Error|error' ./server*/server*.log")
os.system(f"echo '\n**************** Scheduler Log ****************'")
os.system(f"grep -E 'ERROR|Error|error' ./sched/sched.log")
assert return_code == 0

View File

@ -25,43 +25,58 @@ export MS_SCHED_HOST=$5
export MS_SCHED_PORT=$6
export MS_ROLE=MS_SCHED
export fusion=True
for((i=0;i<1;i++));
do
rm -rf ${execute_path}/sched_$i/
mkdir ${execute_path}/sched_$i/
cd ${execute_path}/sched_$i/ || exit
python ${self_path}/../test_full_ps_lenet.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH &
done
rm -rf ${execute_path}/sched/
mkdir ${execute_path}/sched/
cd ${execute_path}/sched/ || exit
python ${self_path}/../test_full_ps_lenet.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH > sched.log 2>&1 &
sched_pid=`echo $!`
export MS_ROLE=MS_PSERVER
server_pids=()
for((i=0;i<$MS_SERVER_NUM;i++));
do
rm -rf ${execute_path}/server_$i/
mkdir ${execute_path}/server_$i/
cd ${execute_path}/server_$i/ || exit
python ${self_path}/../test_full_ps_lenet.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH &
python ${self_path}/../test_full_ps_lenet.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH > server_$i.log 2>&1 &
server_pids[${i}]=`echo $!`
done
export MS_ROLE=MS_WORKER
process_pid=()
worker_pids=()
for((i=0;i<$MS_WORKER_NUM;i++));
do
rm -rf ${execute_path}/worker_$i/
mkdir ${execute_path}/worker_$i/
cd ${execute_path}/worker_$i/ || exit
python ${self_path}/../test_full_ps_lenet.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH &
process_pid[${i}]=`echo $!`
python ${self_path}/../test_full_ps_lenet.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH > worker_$i.log 2>&1 &
worker_pids[${i}]=`echo $!`
done
for((i=0; i<${MS_WORKER_NUM}; i++)); do
wait ${process_pid[i]}
wait ${worker_pids[i]}
status=`echo $?`
if [ "${status}" != "0" ]; then
echo "[ERROR] test_full_ps_lenet failed. status: ${status}"
echo "[ERROR] test_full_ps_lenet failed. Failed to wait worker_{$i}, status: ${status}"
exit 1
else
echo "[INFO] test_full_ps_lenet success."
fi
done
for((i=0; i<${MS_SERVER_NUM}; i++)); do
wait ${server_pids[i]}
status=`echo $?`
if [ "${status}" != "0" ]; then
echo "[ERROR] test_full_ps_lenet failed. Failed to wait server_{$i}, status: ${status}"
exit 1
fi
done
wait ${sched_pid}
status=`echo $?`
if [ "${status}" != "0" ]; then
echo "[ERROR] test_full_ps_lenet failed. Failed to wait scheduler, status: ${status}"
exit 1
fi
exit 0

View File

@ -13,10 +13,27 @@
# limitations under the License.
# ============================================================================
import os
import pytest
def test_full_ps_ascend_lenet():
@pytest.mark.level0
@pytest.mark.platform_x86_ascend_training
@pytest.mark.platform_arm_ascend_training
@pytest.mark.env_single
def test_full_ps_lenet():
"""
Feature: Parameter Server.
Description: Test LeNet accuracy in ps mode.
Expectation: success.
"""
return_code = os.system(
"bash shell_run_test.sh GPU /home/workspace/mindspore_dataset/mnist 1 1 127.0.0.1 8082"
"bash shell_run_test.sh Ascend /home/workspace/mindspore_dataset/mnist 1 1 127.0.0.1 8082"
)
if return_code != 0:
os.system(f"echo '\n**************** Worker Log ****************'")
os.system(f"grep -E 'ERROR|Error|error' ./worker*/worker*.log")
os.system(f"echo '\n**************** Server Log ****************'")
os.system(f"grep -E 'ERROR|Error|error' ./server*/server*.log")
os.system(f"echo '\n**************** Scheduler Log ****************'")
os.system(f"grep -E 'ERROR|Error|error' ./sched/sched.log")
assert return_code == 0

View File

@ -1,35 +0,0 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
import os
def test_ps_ascend_multi_worker_multi_server():
return_code = os.system("bash shell_run_test.sh Ascend 8 8 127.0.0.1 8088")
assert return_code == 0
def test_ps_ascend():
return_code = os.system("bash shell_run_test.sh Ascend 1 1 127.0.0.1 8088")
assert return_code == 0
def test_ps_gpu_multi_worker_multi_server():
return_code = os.system("bash shell_run_test.sh GPU 8 8 127.0.0.1 8088")
assert return_code == 0
def test_ps_gpu():
return_code = os.system("bash shell_run_test.sh GPU 1 1 127.0.0.1 8088")
assert return_code == 0

View File

@ -1,283 +0,0 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""ResNet."""
import numpy as np
import mindspore.nn as nn
from mindspore.ops import operations as P
from mindspore.common.tensor import Tensor
def _weight_variable(shape, factor=0.01):
init_value = np.random.randn(*shape).astype(np.float32) * factor
return Tensor(init_value)
def _conv3x3(in_channel, out_channel, stride=1):
weight_shape = (out_channel, in_channel, 3, 3)
weight = _weight_variable(weight_shape)
return nn.Conv2d(in_channel, out_channel,
kernel_size=3, stride=stride, padding=0, pad_mode='same', weight_init=weight)
def _conv1x1(in_channel, out_channel, stride=1):
weight_shape = (out_channel, in_channel, 1, 1)
weight = _weight_variable(weight_shape)
return nn.Conv2d(in_channel, out_channel,
kernel_size=1, stride=stride, padding=0, pad_mode='same', weight_init=weight)
def _conv7x7(in_channel, out_channel, stride=1):
weight_shape = (out_channel, in_channel, 7, 7)
weight = _weight_variable(weight_shape)
return nn.Conv2d(in_channel, out_channel,
kernel_size=7, stride=stride, padding=0, pad_mode='same', weight_init=weight)
def _bn(channel):
return nn.BatchNorm2d(channel, eps=1e-4, momentum=0.9,
gamma_init=1, beta_init=0, moving_mean_init=0, moving_var_init=1)
def _bn_last(channel):
return nn.BatchNorm2d(channel, eps=1e-4, momentum=0.9,
gamma_init=0, beta_init=0, moving_mean_init=0, moving_var_init=1)
def _fc(in_channel, out_channel):
weight_shape = (out_channel, in_channel)
weight = _weight_variable(weight_shape)
return nn.Dense(in_channel, out_channel, has_bias=True, weight_init=weight, bias_init=0)
class ResidualBlock(nn.Cell):
"""
ResNet V1 residual block definition.
Args:
in_channel (int): Input channel.
out_channel (int): Output channel.
stride (int): Stride size for the first convolutional layer. Default: 1.
Returns:
Tensor, output tensor.
Examples:
>>> ResidualBlock(3, 256, stride=2)
"""
expansion = 4
def __init__(self,
in_channel,
out_channel,
stride=1):
super(ResidualBlock, self).__init__()
channel = out_channel // self.expansion
self.conv1 = _conv1x1(in_channel, channel, stride=1)
self.bn1 = _bn(channel)
self.conv2 = _conv3x3(channel, channel, stride=stride)
self.bn2 = _bn(channel)
self.conv3 = _conv1x1(channel, out_channel, stride=1)
self.bn3 = _bn_last(out_channel)
self.relu = nn.ReLU()
self.down_sample = False
if stride != 1 or in_channel != out_channel:
self.down_sample = True
self.down_sample_layer = None
if self.down_sample:
self.down_sample_layer = nn.SequentialCell([_conv1x1(in_channel, out_channel, stride),
_bn(out_channel)])
self.add = P.Add()
def construct(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
if self.down_sample:
identity = self.down_sample_layer(identity)
out = self.add(out, identity)
out = self.relu(out)
return out
class ResNet(nn.Cell):
"""
ResNet architecture.
Args:
block (Cell): Block for network.
layer_nums (list): Numbers of block in different layers.
in_channels (list): Input channel in each layer.
out_channels (list): Output channel in each layer.
strides (list): Stride size in each layer.
num_classes (int): The number of classes that the training images are belonging to.
Returns:
Tensor, output tensor.
Examples:
>>> ResNet(ResidualBlock,
>>> [3, 4, 6, 3],
>>> [64, 256, 512, 1024],
>>> [256, 512, 1024, 2048],
>>> [1, 2, 2, 2],
>>> 10)
"""
def __init__(self,
block,
layer_nums,
in_channels,
out_channels,
strides,
num_classes):
super(ResNet, self).__init__()
if not len(layer_nums) == len(in_channels) == len(out_channels) == 4:
raise ValueError("the length of layer_num, in_channels, out_channels list must be 4!")
self.conv1 = _conv7x7(3, 64, stride=2)
self.bn1 = _bn(64)
self.relu = P.ReLU()
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode="same")
self.layer1 = self._make_layer(block,
layer_nums[0],
in_channel=in_channels[0],
out_channel=out_channels[0],
stride=strides[0])
self.layer2 = self._make_layer(block,
layer_nums[1],
in_channel=in_channels[1],
out_channel=out_channels[1],
stride=strides[1])
self.layer3 = self._make_layer(block,
layer_nums[2],
in_channel=in_channels[2],
out_channel=out_channels[2],
stride=strides[2])
self.layer4 = self._make_layer(block,
layer_nums[3],
in_channel=in_channels[3],
out_channel=out_channels[3],
stride=strides[3])
self.mean = P.ReduceMean(keep_dims=True)
self.flatten = nn.Flatten()
self.end_point = _fc(out_channels[3], num_classes)
def _make_layer(self, block, layer_num, in_channel, out_channel, stride):
"""
Make stage network of ResNet.
Args:
block (Cell): Resnet block.
layer_num (int): Layer number.
in_channel (int): Input channel.
out_channel (int): Output channel.
stride (int): Stride size for the first convolutional layer.
Returns:
SequentialCell, the output layer.
Examples:
>>> _make_layer(ResidualBlock, 3, 128, 256, 2)
"""
layers = []
resnet_block = block(in_channel, out_channel, stride=stride)
layers.append(resnet_block)
for _ in range(1, layer_num):
resnet_block = block(out_channel, out_channel, stride=1)
layers.append(resnet_block)
return nn.SequentialCell(layers)
def construct(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
c1 = self.maxpool(x)
c2 = self.layer1(c1)
c3 = self.layer2(c2)
c4 = self.layer3(c3)
c5 = self.layer4(c4)
out = self.mean(c5, (2, 3))
out = self.flatten(out)
out = self.end_point(out)
return out
def resnet50(class_num=10):
"""
Get ResNet50 neural network.
Args:
class_num (int): Class number.
Returns:
Cell, cell instance of ResNet50 neural network.
Examples:
>>> net = resnet50(10)
"""
return ResNet(ResidualBlock,
[3, 4, 6, 3],
[64, 256, 512, 1024],
[256, 512, 1024, 2048],
[1, 2, 2, 2],
class_num)
def resnet101(class_num=1001):
"""
Get ResNet101 neural network.
Args:
class_num (int): Class number.
Returns:
Cell, cell instance of ResNet101 neural network.
Examples:
>>> net = resnet101(1001)
"""
return ResNet(ResidualBlock,
[3, 4, 23, 3],
[64, 256, 512, 1024],
[256, 512, 1024, 2048],
[1, 2, 2, 2],
class_num)

View File

@ -1,80 +0,0 @@
#!/bin/bash
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
execute_path=$(pwd)
self_path=$(dirname $0)
export MS_SCHED_NUM=1
DEVICE_TARGET=$1
export MS_WORKER_NUM=$2
export MS_SERVER_NUM=$3
export MS_SCHED_HOST=$4
export MS_SCHED_PORT=$5
export MS_ROLE=MS_SCHED
for((i=0;i<1;i++));
do
rm -rf ${execute_path}/sched_$i/
mkdir ${execute_path}/sched_$i/
cd ${execute_path}/sched_$i/ || exit
python ${self_path}/../test_multi_full_ps.py --device_target=$DEVICE_TARGET &
done
export MS_ROLE=MS_PSERVER
for((i=0;i<$MS_SERVER_NUM;i++));
do
rm -rf ${execute_path}/server_$i/
mkdir ${execute_path}/server_$i/
cd ${execute_path}/server_$i/ || exit
export RANK_ID=$i
export DEVICE_ID=$i
python ${self_path}/../test_multi_full_ps.py --device_target=$DEVICE_TARGET &
done
export MS_ROLE=MS_WORKER
process_pid=()
if [ $DEVICE_TARGET == "Ascend" ];then
for((i=0;i<$MS_WORKER_NUM;i++));
do
rm -rf ${execute_path}/worker_$i/
mkdir ${execute_path}/worker_$i/
cd ${execute_path}/worker_$i/ || exit
export RANK_ID=$i
export DEVICE_ID=$i
python ${self_path}/../test_multi_full_ps.py --device_target=$DEVICE_TARGET &
process_pid[${i}]=`echo $!`
done
for((i=0; i<${MS_WORKER_NUM}; i++)); do
wait ${process_pid[i]}
status=`echo $?`
if [ "${status}" != "0" ]; then
echo "[ERROR] test_multi_full_ps failed. status: ${status}"
exit 1
else
echo "[INFO] test_multi_full_ps success."
fi
done
fi
if [ $DEVICE_TARGET == "GPU" ];then
rm -rf ${execute_path}/worker/
mkdir ${execute_path}/worker/
cd ${execute_path}/worker/ || exit
mpirun -n $MS_WORKER_NUM python ${self_path}/../test_multi_full_ps.py --device_target=$DEVICE_TARGET &
wait $!
fi
exit 0

View File

@ -1,118 +0,0 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
import sys
import argparse
import numpy as np
import mindspore.context as context
import mindspore.nn as nn
from mindspore.common.initializer import TruncatedNormal
from mindspore import Tensor
from mindspore.nn import TrainOneStepCell, WithLossCell
from mindspore.communication.management import init, get_group_size
from mindspore.parallel._ps_context import _is_role_pserver
# from resnet import resnet50
parser = argparse.ArgumentParser(description="test_ps_lenet")
parser.add_argument("--device_target", type=str, default="Ascend")
args, _ = parser.parse_known_args()
device_target = args.device_target
context.set_context(mode=context.GRAPH_MODE, device_target=device_target)
context.set_ps_context(enable_ps=True)
if device_target == "GPU":
init()
def conv(in_channels, out_channels, kernel_size, stride=1, padding=0):
"""weight initial for conv layer"""
weight = weight_variable()
return nn.Conv2d(
in_channels,
out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
weight_init=weight,
has_bias=False,
pad_mode="valid",
)
def fc_with_initialize(input_channels, out_channels):
"""weight initial for fc layer"""
weight = weight_variable()
bias = weight_variable()
return nn.Dense(input_channels, out_channels, weight, bias)
def weight_variable():
"""weight initial"""
return TruncatedNormal(0.02)
class LeNet5(nn.Cell):
def __init__(self, num_class=10, channel=3):
super(LeNet5, self).__init__()
self.num_class = num_class
self.conv1 = conv(channel, 6, 5)
self.conv2 = conv(6, 16, 5)
self.fc1 = fc_with_initialize(16 * 5 * 5, 120)
self.fc2 = fc_with_initialize(120, 84)
self.fc3 = fc_with_initialize(84, self.num_class)
self.relu = nn.ReLU()
self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2)
self.flatten = nn.Flatten()
def construct(self, x):
x = self.conv1(x)
x = self.relu(x)
x = self.max_pool2d(x)
x = self.conv2(x)
x = self.relu(x)
x = self.max_pool2d(x)
x = self.flatten(x)
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
x = self.relu(x)
x = self.fc3(x)
return x
if __name__ == "__main__":
epoch = 5
np.random.seed(0)
network = LeNet5(10)
network.set_param_ps()
criterion = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean")
net_opt = nn.Momentum(network.trainable_params(), 0.01, 0.9)
if device_target == "GPU":
context.set_auto_parallel_context(parallel_mode="data_parallel", gradients_mean=True,
device_num=get_group_size())
net_with_criterion = WithLossCell(network, criterion)
train_network = TrainOneStepCell(net_with_criterion, net_opt)
train_network.set_train()
losses = []
for _ in range(epoch):
data = Tensor(np.random.rand(32, 3, 32, 32).astype(np.float32))
label = Tensor(np.random.randint(0, 9, (32)).astype(np.int32))
if _is_role_pserver():
train_network(data, label)
sys.exit()
else:
loss = train_network(data, label).asnumpy()
losses.append(loss)
print(losses)

View File

@ -24,45 +24,59 @@ export MS_SERVER_NUM=$4
export MS_SCHED_HOST=$5
export MS_SCHED_PORT=$6
export MS_ROLE=MS_SCHED
export fusion=True
for((i=0;i<1;i++));
do
rm -rf ${execute_path}/sched_$i/
mkdir ${execute_path}/sched_$i/
cd ${execute_path}/sched_$i/ || exit
python ${self_path}/../test_ps_embedding_heterogeneous_conv2d_adam.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH &
done
rm -rf ${execute_path}/sched/
mkdir ${execute_path}/sched/
cd ${execute_path}/sched/ || exit
python ${self_path}/../test_ps_embedding_heterogeneous_conv2d_adam.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH > sched.log 2>&1 &
sched_pid=`echo $!`
export MS_ROLE=MS_PSERVER
server_pids=()
for((i=0;i<$MS_SERVER_NUM;i++));
do
rm -rf ${execute_path}/server_$i/
mkdir ${execute_path}/server_$i/
cd ${execute_path}/server_$i/ || exit
python ${self_path}/../test_ps_embedding_heterogeneous_conv2d_adam.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH &
python ${self_path}/../test_ps_embedding_heterogeneous_conv2d_adam.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH > server_$i.log 2>&1 &
server_pids[${i}]=`echo $!`
done
export MS_ROLE=MS_WORKER
process_pid=()
worker_pids=()
for((i=0;i<$MS_WORKER_NUM;i++));
do
rm -rf ${execute_path}/worker_$i/
mkdir ${execute_path}/worker_$i/
cd ${execute_path}/worker_$i/ || exit
python ${self_path}/../test_ps_embedding_heterogeneous_conv2d_adam.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH &
process_pid[${i}]=`echo $!`
python ${self_path}/../test_ps_embedding_heterogeneous_conv2d_adam.py --device_target=$DEVICE_TARGET --dataset_path=$DATASET_PATH > worker_$i.log 2>&1 &
worker_pids[${i}]=`echo $!`
done
for((i=0; i<${MS_WORKER_NUM}; i++)); do
wait ${process_pid[i]}
wait ${worker_pids[i]}
status=`echo $?`
if [ "${status}" != "0" ]; then
echo "[ERROR] test_ps_embedding_heterogeneous_conv2d_adam failed. status: ${status}"
echo "[ERROR] test_ps_embedding_heterogeneous_conv2d_adam failed. Failed to wait worker_{$i}, status: ${status}"
exit 1
else
echo "[INFO] test_ps_embedding_heterogeneous_conv2d_adam success."
fi
done
for((i=0; i<${MS_SERVER_NUM}; i++)); do
wait ${server_pids[i]}
status=`echo $?`
if [ "${status}" != "0" ]; then
echo "[ERROR] test_ps_embedding_heterogeneous_conv2d_adam failed. Failed to wait server_{$i}, status: ${status}"
exit 1
fi
done
wait ${sched_pid}
status=`echo $?`
if [ "${status}" != "0" ]; then
echo "[ERROR] test_ps_embedding_heterogeneous_conv2d_adam failed. Failed to wait scheduler, status: ${status}"
exit 1
fi
exit 0

View File

@ -13,10 +13,27 @@
# limitations under the License.
# ============================================================================
import os
import pytest
@pytest.mark.level0
@pytest.mark.platform_x86_ascend_training
@pytest.mark.platform_arm_ascend_training
@pytest.mark.env_single
def test_ps_embedding_heterogeneous_conv2d_adam():
"""
Feature: Parameter Server.
Description: Test embedding with heterogeneous conv2d.
Expectation: success.
"""
return_code = os.system(
"bash shell_run_test.sh GPU /home/workspace/mindspore_dataset/mnist 1 1 127.0.0.1 8085"
"bash shell_run_test.sh Ascend /home/workspace/mindspore_dataset/mnist 1 1 127.0.0.1 8085"
)
if return_code != 0:
os.system(f"echo '\n**************** Worker Log ****************'")
os.system(f"grep -E 'ERROR|Error|error' ./worker*/worker*.log")
os.system(f"echo '\n**************** Server Log ****************'")
os.system(f"grep -E 'ERROR|Error|error' ./server*/server*.log")
os.system(f"echo '\n**************** Scheduler Log ****************'")
os.system(f"grep -E 'ERROR|Error|error' ./sched/sched.log")
assert return_code == 0

View File

@ -85,7 +85,7 @@ def create_dataset(data_path, batch_size=32, repeat_size=1,
create dataset for train or test
"""
# define dataset
mnist_ds = ds.MnistDataset(data_path)
mnist_ds = ds.MnistDataset(data_path, num_shards=512, shard_id=0)
resize_height, resize_width = 32, 32
rescale = 1.0 / 255.0