test thor r1.6
This commit is contained in:
@ -35,5 +35,8 @@ config = ed({
"label_smooth_factor": 0.1,
"frequency": 834,
"eval_interval": 1,
"eval_batch_size": 32
"eval_batch_size": 32,
"train_image_size": 224,
"eval_image_size": 224,
"device_target": "Ascend"
@ -12,72 +12,172 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""create train or eval dataset."""
import os
import mindspore.common.dtype as mstype
create train or eval dataset.
import multiprocessing
import mindspore as ms
import mindspore.dataset as ds
import mindspore.dataset.vision.c_transforms as C
import mindspore.dataset.transforms.c_transforms as C2
from mindspore.communication.management import init, get_rank, get_group_size
def create_dataset(dataset_path, do_train, repeat_num=1, batch_size=32):
def create_dataset1(dataset_path, do_train, batch_size=32, train_image_size=224, eval_image_size=224,
target="Ascend", distribute=False, enable_cache=False, cache_session_id=None):
create a train or eval dataset.
create a train or evaluate cifar10 dataset for resnet50
dataset_path(string): the path of dataset.
do_train(bool): whether dataset is used for train or eval.
repeat_num(int): the repeat times of dataset. Default: 1
batch_size(int): the batch size of dataset. Default: 32
target(str): the device target. Default: Ascend
distribute(bool): data for distribute or not. Default: False
enable_cache(bool): whether tensor caching service is used for eval. Default: False
cache_session_id(int): If enable_cache, cache session_id need to be provided. Default: None
device_num, rank_id = _get_rank_info(distribute)
if device_num == 1:
data_set = ds.Cifar10Dataset(dataset_path, num_parallel_workers=get_num_parallel_workers(12), shuffle=True)
data_set = ds.Cifar10Dataset(dataset_path, num_parallel_workers=get_num_parallel_workers(12), shuffle=True,
num_shards=device_num, shard_id=rank_id)
# define map operations
trans = []
if do_train:
trans += [
ds.vision.c_transforms.RandomCrop((32, 32), (4, 4, 4, 4)),
trans += [
ds.vision.c_transforms.Resize((train_image_size, train_image_size)),
ds.vision.c_transforms.Rescale(1.0 / 255.0, 0.0),
ds.vision.c_transforms.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]),
type_cast_op = ds.transforms.c_transforms.TypeCast(ms.int32)
data_set = data_set.map(operations=type_cast_op, input_columns="label",
# only enable cache for eval
if do_train:
enable_cache = False
if enable_cache:
if not cache_session_id:
raise ValueError("A cache session_id must be provided to use cache.")
eval_cache = ds.DatasetCache(session_id=int(cache_session_id), size=0)
data_set = data_set.map(operations=trans, input_columns="image",
num_parallel_workers=get_num_parallel_workers(8), cache=eval_cache)
data_set = data_set.map(operations=trans, input_columns="image",
# apply batch operations
data_set = data_set.batch(batch_size, drop_remainder=True)
return data_set
def create_dataset2(dataset_path, do_train, batch_size=32, train_image_size=224, eval_image_size=224,
target="Ascend", distribute=False, enable_cache=False, cache_session_id=None):
create a train or eval imagenet2012 dataset for resnet50
dataset_path(string): the path of dataset.
do_train(bool): whether dataset is used for train or eval.
repeat_num(int): the repeat times of dataset. Default: 1
batch_size(int): the batch size of dataset. Default: 32
target(str): the device target. Default: Ascend
distribute(bool): data for distribute or not. Default: False
enable_cache(bool): whether tensor caching service is used for eval. Default: False
cache_session_id(int): If enable_cache, cache session_id need to be provided. Default: None
device_num, rank_id = _get_rank_info(distribute)
device_num = int(os.getenv("RANK_SIZE"))
rank_id = int(os.getenv("RANK_ID"))
if do_train:
if device_num == 1:
data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=16, shuffle=True)
data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=12, shuffle=True,
num_shards=device_num, shard_id=rank_id)
if device_num == 1:
data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=get_num_parallel_workers(12), shuffle=True)
data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=12, shuffle=False,
data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=get_num_parallel_workers(12), shuffle=True,
num_shards=device_num, shard_id=rank_id)
image_size = 224
mean = [0.485 * 255, 0.456 * 255, 0.406 * 255]
std = [0.229 * 255, 0.224 * 255, 0.225 * 255]
# define map operations
if do_train:
trans = [
C.Resize((256, 256)),
C.Normalize(mean=mean, std=std),
ds.vision.c_transforms.RandomCropDecodeResize(train_image_size, scale=(0.08, 1.0), ratio=(0.75, 1.333)),
trans = [
C.Resize((256, 256)),
C.Normalize(mean=mean, std=std),
trans_norm = [ds.vision.c_transforms.Normalize(mean=mean, std=std), ds.vision.c_transforms.HWC2CHW()]
type_cast_op = C2.TypeCast(mstype.int32)
type_cast_op = ds.transforms.c_transforms.TypeCast(ms.int32)
data_set = data_set.map(operations=trans, input_columns="image", num_parallel_workers=24)
data_set = data_set.map(operations=type_cast_op, input_columns="label", num_parallel_workers=12)
data_set = data_set.map(operations=trans, input_columns="image", num_parallel_workers=get_num_parallel_workers(12))
data_set = data_set.map(operations=trans_norm, input_columns="image",
# only enable cache for eval
if do_train:
enable_cache = False
if enable_cache:
if not cache_session_id:
raise ValueError("A cache session_id must be provided to use cache.")
eval_cache = ds.DatasetCache(session_id=int(cache_session_id), size=0)
data_set = data_set.map(operations=type_cast_op, input_columns="label",
data_set = data_set.map(operations=type_cast_op, input_columns="label",
# apply batch operations
# apply batch operationsif
data_set = data_set.batch(batch_size, drop_remainder=True)
# apply dataset repeat operation
data_set = data_set.repeat(repeat_num)
return data_set
def _get_rank_info(distribute):
get rank size and rank id
if distribute:
rank_id = get_rank()
device_num = get_group_size()
rank_id = 0
device_num = 1
return device_num, rank_id
def get_num_parallel_workers(num_parallel_workers):
Get num_parallel_workers used in dataset operations.
If num_parallel_workers > the real CPU cores number, set num_parallel_workers = the real CPU cores number.
cores = multiprocessing.cpu_count()
if isinstance(num_parallel_workers, int):
if cores < num_parallel_workers:
print("The num_parallel_workers {} is set too large, now set it {}".format(num_parallel_workers, cores))
num_parallel_workers = cores
print("The num_parallel_workers {} is invalid, now set it {}".format(num_parallel_workers, min(cores, 8)))
num_parallel_workers = min(cores, 8)
return num_parallel_workers
@ -574,7 +574,7 @@ class Model:
>>> model.train(2, dataset)
repeat_count = train_dataset.get_repeat_count()
if epoch != repeat_count and dataset_sink_mode is True:
if epoch != repeat_count and dataset_sink_mode:
logger.warning(f"The epoch_size {epoch} is not the same with dataset repeat_count {repeat_count}")
dataset_sink_mode = Validator.check_bool(dataset_sink_mode)
_device_number_check(self._parallel_mode, self._device_number)
@ -1,4 +1,4 @@
# Copyright 2020 Huawei Technologies Co., Ltd
# Copyright 2020-2021 Huawei Technologies Co., Ltd
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -13,61 +13,149 @@
# limitations under the License.
# ============================================================================
import math
import numpy as np
from scipy.stats import truncnorm
import mindspore.nn as nn
import mindspore.ops as ops
import mindspore.common.dtype as mstype
from mindspore.ops import operations as P
from mindspore.ops import functional as F
from mindspore.common.tensor import Tensor
from src.model_utils.config import config
def _conv_variance_scaling_initializer(in_channel, out_channel, kernel_size):
def conv_variance_scaling_initializer(in_channel, out_channel, kernel_size):
fan_in = in_channel * kernel_size * kernel_size
scale = 1.0
scale /= max(1., fan_in)
stddev = (scale ** 0.5) / .87962566103423978
if config.net_name == "resnet152":
stddev = (scale ** 0.5)
mu, sigma = 0, stddev
weight = truncnorm(-2, 2, loc=mu, scale=sigma).rvs(out_channel * in_channel * kernel_size * kernel_size)
weight = np.reshape(weight, (out_channel, in_channel, kernel_size, kernel_size))
return Tensor(weight, dtype=mstype.float32)
def _weight_variable(shape, factor=0.01):
init_value = np.random.randn(*shape).astype(np.float32) * factor
return Tensor(init_value)
def _conv3x3(in_channel, out_channel, stride=1, use_se=False):
def calculate_gain(nonlinearity, param=None):
linear_fns = ['linear', 'conv1d', 'conv2d', 'conv3d', 'conv_transpose1d', 'conv_transpose2d', 'conv_transpose3d']
res = 0
if nonlinearity in linear_fns or nonlinearity == 'sigmoid':
res = 1
elif nonlinearity == 'tanh':
res = 5.0 / 3
elif nonlinearity == 'relu':
res = math.sqrt(2.0)
elif nonlinearity == 'leaky_relu':
if param is None:
neg_slope = 0.01
elif not isinstance(param, bool) and isinstance(param, int) or isinstance(param, float):
neg_slope = param
raise ValueError("neg_slope {} not a valid number".format(param))
res = math.sqrt(2.0 / (1 + neg_slope ** 2))
raise ValueError("Unsupported nonlinearity {}".format(nonlinearity))
return res
def _calculate_fan_in_and_fan_out(tensor):
dimensions = len(tensor)
if dimensions < 2:
raise ValueError("Fan in and fan out can not be computed for tensor with fewer than 2 dimensions")
if dimensions == 2: # Linear
fan_in = tensor[1]
fan_out = tensor[0]
num_input_fmaps = tensor[1]
num_output_fmaps = tensor[0]
receptive_field_size = 1
if dimensions > 2:
receptive_field_size = tensor[2] * tensor[3]
fan_in = num_input_fmaps * receptive_field_size
fan_out = num_output_fmaps * receptive_field_size
return fan_in, fan_out
def _calculate_correct_fan(tensor, mode):
mode = mode.lower()
valid_modes = ['fan_in', 'fan_out']
if mode not in valid_modes:
raise ValueError("Unsupported mode {}, please use one of {}".format(mode, valid_modes))
fan_in, fan_out = _calculate_fan_in_and_fan_out(tensor)
return fan_in if mode == 'fan_in' else fan_out
def kaiming_normal(inputs_shape, a=0, mode='fan_in', nonlinearity='leaky_relu'):
fan = _calculate_correct_fan(inputs_shape, mode)
gain = calculate_gain(nonlinearity, a)
std = gain / math.sqrt(fan)
return np.random.normal(0, std, size=inputs_shape).astype(np.float32)
def kaiming_uniform(inputs_shape, a=0., mode='fan_in', nonlinearity='leaky_relu'):
fan = _calculate_correct_fan(inputs_shape, mode)
gain = calculate_gain(nonlinearity, a)
std = gain / math.sqrt(fan)
bound = math.sqrt(3.0) * std # Calculate uniform bounds from standard deviation
return np.random.uniform(-bound, bound, size=inputs_shape).astype(np.float32)
def _conv3x3(in_channel, out_channel, stride=1, use_se=False, res_base=False):
if use_se:
weight = _conv_variance_scaling_initializer(in_channel, out_channel, kernel_size=3)
weight = conv_variance_scaling_initializer(in_channel, out_channel, kernel_size=3)
weight_shape = (out_channel, in_channel, 3, 3)
weight = _weight_variable(weight_shape)
return nn.Conv2d(in_channel, out_channel,
kernel_size=3, stride=stride, padding=0, pad_mode='same', weight_init=weight)
weight = Tensor(kaiming_normal(weight_shape, mode="fan_out", nonlinearity='relu'))
if config.net_name == "resnet152":
weight = _weight_variable(weight_shape)
if res_base:
return nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=stride,
padding=1, pad_mode='pad', weight_init=weight)
return nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=stride,
padding=0, pad_mode='same', weight_init=weight)
def _conv1x1(in_channel, out_channel, stride=1, use_se=False):
def _conv1x1(in_channel, out_channel, stride=1, use_se=False, res_base=False):
if use_se:
weight = _conv_variance_scaling_initializer(in_channel, out_channel, kernel_size=1)
weight = conv_variance_scaling_initializer(in_channel, out_channel, kernel_size=1)
weight_shape = (out_channel, in_channel, 1, 1)
weight = _weight_variable(weight_shape)
return nn.Conv2d(in_channel, out_channel,
kernel_size=1, stride=stride, padding=0, pad_mode='same', weight_init=weight)
weight = Tensor(kaiming_normal(weight_shape, mode="fan_out", nonlinearity='relu'))
if config.net_name == "resnet152":
weight = _weight_variable(weight_shape)
if res_base:
return nn.Conv2d(in_channel, out_channel, kernel_size=1, stride=stride,
padding=0, pad_mode='pad', weight_init=weight)
return nn.Conv2d(in_channel, out_channel, kernel_size=1, stride=stride,
padding=0, pad_mode='same', weight_init=weight)
def _conv7x7(in_channel, out_channel, stride=1, use_se=False):
def _conv7x7(in_channel, out_channel, stride=1, use_se=False, res_base=False):
if use_se:
weight = _conv_variance_scaling_initializer(in_channel, out_channel, kernel_size=7)
weight = conv_variance_scaling_initializer(in_channel, out_channel, kernel_size=7)
weight_shape = (out_channel, in_channel, 7, 7)
weight = _weight_variable(weight_shape)
weight = Tensor(kaiming_normal(weight_shape, mode="fan_out", nonlinearity='relu'))
if config.net_name == "resnet152":
weight = _weight_variable(weight_shape)
if res_base:
return nn.Conv2d(in_channel, out_channel,
kernel_size=7, stride=stride, padding=3, pad_mode='pad', weight_init=weight)
return nn.Conv2d(in_channel, out_channel,
kernel_size=7, stride=stride, padding=0, pad_mode='same', weight_init=weight)
def _bn(channel):
def _bn(channel, res_base=False):
if res_base:
return nn.BatchNorm2d(channel, eps=1e-5, momentum=0.1,
gamma_init=1, beta_init=0, moving_mean_init=0, moving_var_init=1)
return nn.BatchNorm2d(channel, eps=1e-4, momentum=0.9,
gamma_init=1, beta_init=0, moving_mean_init=0, moving_var_init=1)
@ -79,11 +167,13 @@ def _bn_last(channel):
def _fc(in_channel, out_channel, use_se=False):
if use_se:
weight = np.random.normal(loc=0, scale=0.01, size=out_channel*in_channel)
weight = np.random.normal(loc=0, scale=0.01, size=out_channel * in_channel)
weight = Tensor(np.reshape(weight, (out_channel, in_channel)), dtype=mstype.float32)
weight_shape = (out_channel, in_channel)
weight = _weight_variable(weight_shape)
weight = Tensor(kaiming_uniform(weight_shape, a=math.sqrt(5)))
if config.net_name == "resnet152":
weight = _weight_variable(weight_shape)
return nn.Dense(in_channel, out_channel, has_bias=True, weight_init=weight, bias_init=0)
@ -95,8 +185,8 @@ class ResidualBlock(nn.Cell):
in_channel (int): Input channel.
out_channel (int): Output channel.
stride (int): Stride size for the first convolutional layer. Default: 1.
use_se (bool): enable SE-ResNet50 net. Default: False.
se_block(bool): use se block in SE-ResNet50 net. Default: False.
use_se (bool): Enable SE-ResNet50 net. Default: False.
se_block(bool): Use se block in SE-ResNet50 net. Default: False.
Tensor, output tensor.
@ -126,13 +216,15 @@ class ResidualBlock(nn.Cell):
self.bn2 = _bn(channel)
self.conv3 = _conv1x1(channel, out_channel, stride=1, use_se=self.use_se)
self.bn3 = _bn_last(out_channel)
self.bn3 = _bn(out_channel)
if config.optimizer == "Thor" or config.net_name == "resnet152":
self.bn3 = _bn_last(out_channel)
if self.se_block:
self.se_global_pool = P.ReduceMean(keep_dims=False)
self.se_dense_0 = _fc(out_channel, int(out_channel/4), use_se=self.use_se)
self.se_dense_1 = _fc(int(out_channel/4), out_channel, use_se=self.use_se)
self.se_global_pool = ops.ReduceMean(keep_dims=False)
self.se_dense_0 = _fc(out_channel, int(out_channel / 4), use_se=self.use_se)
self.se_dense_1 = _fc(int(out_channel / 4), out_channel, use_se=self.use_se)
self.se_sigmoid = nn.Sigmoid()
self.se_mul = P.Mul()
self.se_mul = ops.Mul()
self.relu = nn.ReLU()
self.down_sample = False
@ -153,7 +245,6 @@ class ResidualBlock(nn.Cell):
self.down_sample_layer = nn.SequentialCell([_conv1x1(in_channel, out_channel, stride,
use_se=self.use_se), _bn(out_channel)])
self.add = P.Add()
def construct(self, x):
identity = x
@ -176,13 +267,76 @@ class ResidualBlock(nn.Cell):
out = self.relu(out)
out = self.se_dense_1(out)
out = self.se_sigmoid(out)
out = F.reshape(out, F.shape(out) + (1, 1))
out = ops.reshape(out, ops.shape(out) + (1, 1))
out = self.se_mul(out, out_se)
if self.down_sample:
identity = self.down_sample_layer(identity)
out = self.add(out, identity)
out = out + identity
out = self.relu(out)
return out
class ResidualBlockBase(nn.Cell):
ResNet V1 residual block definition.
in_channel (int): Input channel.
out_channel (int): Output channel.
stride (int): Stride size for the first convolutional layer. Default: 1.
use_se (bool): Enable SE-ResNet50 net. Default: False.
se_block(bool): Use se block in SE-ResNet50 net. Default: False.
res_base (bool): Enable parameter setting of resnet18. Default: True.
Tensor, output tensor.
>>> ResidualBlockBase(3, 256, stride=2)
def __init__(self,
super(ResidualBlockBase, self).__init__()
self.res_base = res_base
self.conv1 = _conv3x3(in_channel, out_channel, stride=stride, res_base=self.res_base)
self.bn1d = _bn(out_channel)
self.conv2 = _conv3x3(out_channel, out_channel, stride=1, res_base=self.res_base)
self.bn2d = _bn(out_channel)
self.relu = nn.ReLU()
self.down_sample = False
if stride != 1 or in_channel != out_channel:
self.down_sample = True
self.down_sample_layer = None
if self.down_sample:
self.down_sample_layer = nn.SequentialCell([_conv1x1(in_channel, out_channel, stride,
use_se=use_se, res_base=self.res_base),
_bn(out_channel, res_base)])
def construct(self, x):
identity = x
out = self.conv1(x)
out = self.bn1d(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2d(out)
if self.down_sample:
identity = self.down_sample_layer(identity)
out = out + identity
out = self.relu(out)
return out
@ -199,8 +353,10 @@ class ResNet(nn.Cell):
out_channels (list): Output channel in each layer.
strides (list): Stride size in each layer.
num_classes (int): The number of classes that the training images are belonging to.
use_se (bool): enable SE-ResNet50 net. Default: False.
se_block(bool): use se block in SE-ResNet50 net in layer 3 and layer 4. Default: False.
use_se (bool): Enable SE-ResNet50 net. Default: False.
se_block(bool): Use se block in SE-ResNet50 net in layer 3 and layer 4. Default: False.
res_base (bool): Enable parameter setting of resnet18. Default: False.
Tensor, output tensor.
@ -220,27 +376,26 @@ class ResNet(nn.Cell):
super(ResNet, self).__init__()
if not len(layer_nums) == len(in_channels) == len(out_channels) == 4:
raise ValueError("the length of layer_num, in_channels, out_channels list must be 4!")
self.use_se = use_se
self.res_base = res_base
self.se_block = False
if self.use_se:
self.se_block = True
if self.use_se:
self.conv1_0 = _conv3x3(3, 32, stride=2, use_se=self.use_se)
self.bn1_0 = _bn(32)
self.conv1_1 = _conv3x3(32, 32, stride=1, use_se=self.use_se)
self.bn1_1 = _bn(32)
self.conv1_2 = _conv3x3(32, 64, stride=1, use_se=self.use_se)
self.conv1 = _conv7x7(3, 64, stride=2, res_base=self.res_base)
self.bn1 = _bn(64, self.res_base)
self.relu = ops.ReLU()
if self.res_base:
self.pad = nn.Pad(paddings=((0, 0), (0, 0), (1, 1), (1, 1)))
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode="valid")
self.conv1 = _conv7x7(3, 64, stride=2)
self.bn1 = _bn(64)
self.relu = P.ReLU()
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode="same")
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode="same")
self.layer1 = self._make_layer(block,
@ -268,7 +423,7 @@ class ResNet(nn.Cell):
self.mean = P.ReduceMean(keep_dims=True)
self.mean = ops.ReduceMean(keep_dims=True)
self.flatten = nn.Flatten()
self.end_point = _fc(out_channels[3], num_classes, use_se=self.use_se)
@ -282,7 +437,7 @@ class ResNet(nn.Cell):
in_channel (int): Input channel.
out_channel (int): Output channel.
stride (int): Stride size for the first convolutional layer.
se_block(bool): use se block in SE-ResNet50 net. Default: False.
se_block(bool): Use se block in SE-ResNet50 net. Default: False.
SequentialCell, the output layer.
@ -318,6 +473,8 @@ class ResNet(nn.Cell):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
if self.res_base:
x = self.pad(x)
c1 = self.maxpool(x)
c2 = self.layer1(c1)
@ -332,6 +489,50 @@ class ResNet(nn.Cell):
return out
def resnet18(class_num=10):
Get ResNet18 neural network.
class_num (int): Class number.
Cell, cell instance of ResNet18 neural network.
>>> net = resnet18(10)
return ResNet(ResidualBlockBase,
[2, 2, 2, 2],
[64, 64, 128, 256],
[64, 128, 256, 512],
[1, 2, 2, 2],
def resnet34(class_num=10):
Get ResNet34 neural network.
class_num (int): Class number.
Cell, cell instance of ResNet34 neural network.
>>> net = resnet18(10)
return ResNet(ResidualBlockBase,
[3, 4, 6, 3],
[64, 64, 128, 256],
[64, 128, 256, 512],
[1, 2, 2, 2],
def resnet50(class_num=10):
Get ResNet50 neural network.
@ -352,6 +553,7 @@ def resnet50(class_num=10):
[1, 2, 2, 2],
def se_resnet50(class_num=1001):
Get SE-ResNet50 neural network.
@ -373,6 +575,7 @@ def se_resnet50(class_num=1001):
def resnet101(class_num=1001):
Get ResNet101 neural network.
@ -392,3 +595,24 @@ def resnet101(class_num=1001):
[256, 512, 1024, 2048],
[1, 2, 2, 2],
def resnet152(class_num=1001):
Get ResNet152 neural network.
class_num (int): Class number.
Cell, cell instance of ResNet152 neural network.
# >>> net = resnet152(1001)
return ResNet(ResidualBlock,
[3, 8, 36, 3],
[64, 256, 512, 1024],
[256, 512, 1024, 2048],
[1, 2, 2, 2],
@ -37,7 +37,6 @@ Embedding = 3
LayerNorm = 4
BatchNorm = 5
_momentum_opt = C.MultitypeFuncGraph("momentum_opt")
op_add = P.AddN()
@ -59,6 +58,7 @@ def _tensor_run_opt_ext(opt, momentum, learning_rate, gradient, weight, moment):
success = F.depend(success, opt(weight, moment, learning_rate, gradient, momentum))
return success
C0 = 16
@ -122,11 +122,13 @@ def find_net_layertype_recur(net, layertype_map):
find_net_layertype_recur(subcell, layertype_map)
def get_net_layertype_mask(net):
layertype_map = []
find_net_layertype_recur(net, layertype_map)
return layertype_map
def get_layer_counter(layer_type, layer_counter, params, idx):
"""get layer counter"""
if layer_type in [Conv, FC, LayerNorm, BatchNorm]:
@ -247,7 +249,6 @@ class THOR_Ascend(Optimizer):
self.grad_reducer_A = DistributedGradReducer(self.matrix_A, mean, degree, fusion_type=6)
self.grad_reducer_G = DistributedGradReducer(self.matrix_A, mean, degree, fusion_type=8)
def _process_matrix_init_and_weight_idx_map(self, net):
"""process matrix init shape, and get weight idx map"""
layer_type_map = get_net_layertype_mask(net)
@ -26,18 +26,17 @@ from mindspore.common.tensor import Tensor
from mindspore.communication.management import init
from mindspore.context import ParallelMode
from mindspore.train.callback import Callback
from mindspore.train.model import Model
from mindspore.train.train_thor import ConvertModelUtils
from mindspore.train.loss_scale_manager import FixedLossScaleManager
from mindspore.nn.optim import thor
import mindspore.dataset as ds
from tests.st.networks.models.resnet50.src.dataset import create_dataset
from tests.st.networks.models.resnet50.src.metric import DistAccuracy, ClassifyCorrectCell
from tests.st.networks.models.resnet50.src.CrossEntropySmooth import CrossEntropySmooth
from tests.st.networks.models.resnet50.src_thor.config import config as thor_config
from tests.st.networks.models.resnet50.src_thor.dataset import create_dataset as create_dataset_thor
from tests.st.networks.models.resnet50.src_thor.model_thor import Model as THOR_Model
from tests.st.networks.models.resnet50.src_thor.resnet import resnet50 as resnet50_thor
from tests.st.networks.models.resnet50.src_thor.dataset import create_dataset2 as create_dataset_thor
from tests.st.networks.models.resnet50.src.resnet import resnet50
MINDSPORE_HCCL_CONFIG_PATH = "/home/workspace/mindspore_config/hccl/rank_table_8p.json"
dataset_path = "/home/workspace/mindspore_dataset/imagenet/imagenet_original/train"
@ -89,11 +88,12 @@ class LossGet(Callback):
self._per_print_times = per_print_times
self._loss = 0.0
self.data_size = data_size
self._epoch = 0
def step_end(self, run_context):
cb_params = run_context.original_args()
loss = cb_params.net_outputs
self._epoch = cb_params.cur_epoch_num
if isinstance(loss, (tuple, list)):
if isinstance(loss[0], Tensor) and isinstance(loss[0].asnumpy(), np.ndarray):
loss = loss[0]
@ -106,8 +106,11 @@ class LossGet(Callback):
if isinstance(loss, float) and (np.isnan(loss) or np.isinf(loss)):
raise ValueError("epoch: {} step: {}. Invalid loss, terminating training."
.format(cb_params.cur_epoch_num, cur_step_in_epoch))
cur_step_in_epoch = (cb_params.cur_step_num - 1) % cb_params.batch_num + 1
if self._per_print_times != 0 and cb_params.cur_step_num % self._per_print_times == 0:
self._loss = loss
print("epoch: %s step: %s, loss is %s" % (cb_params.cur_epoch_num,
cur_step_in_epoch, loss), flush=True)
def epoch_begin(self, run_context):
self.epoch_time = time.time()
@ -122,6 +125,9 @@ class LossGet(Callback):
def get_per_step_time(self):
return self._per_step_mseconds
def get_epoch(self):
return self._epoch
def train_process_thor(q, device_id, epoch_size, device_num, enable_hccl):
os.system("mkdir " + str(device_id))
@ -137,7 +143,7 @@ def train_process_thor(q, device_id, epoch_size, device_num, enable_hccl):
# network
net = resnet50_thor(thor_config.class_num)
net = resnet50(thor_config.class_num)
if not thor_config.label_smooth:
thor_config.label_smooth_factor = 0.0
@ -148,14 +154,10 @@ def train_process_thor(q, device_id, epoch_size, device_num, enable_hccl):
# train dataset
dataset = create_dataset_thor(dataset_path=dataset_path, do_train=True,
repeat_num=1, batch_size=thor_config.batch_size)
batch_size=thor_config.batch_size, train_image_size=thor_config.train_image_size,
eval_image_size=thor_config.eval_image_size, target="Ascend",
step_size = dataset.get_dataset_size()
eval_interval = thor_config.eval_interval
# evaluation dataset
eval_dataset = create_dataset(dataset_path=eval_path, do_train=False,
repeat_num=1, batch_size=thor_config.eval_batch_size)
# loss scale
loss_scale = FixedLossScaleManager(thor_config.loss_scale, drop_overflow_update=False)
@ -171,90 +173,30 @@ def train_process_thor(q, device_id, epoch_size, device_num, enable_hccl):
# evaluation network
dist_eval_network = ClassifyCorrectCell(net)
# model
model = THOR_Model(net, loss_fn=loss, optimizer=opt, loss_scale_manager=loss_scale, amp_level="O2",
metrics={'acc': DistAccuracy(batch_size=thor_config.eval_batch_size, device_num=device_num)},
eval_network=dist_eval_network, frequency=thor_config.frequency)
model = Model(net, loss_fn=loss, optimizer=opt, loss_scale_manager=loss_scale,
metrics={'acc': DistAccuracy(batch_size=thor_config.eval_batch_size, device_num=device_num)},
amp_level="O2", keep_batchnorm_fp32=False,
# model init
print("init_start", device_id)
model.init(dataset, eval_dataset)
print("init_stop", device_id)
model = ConvertModelUtils().convert_to_thor_model(model=model, network=net, loss_fn=loss, optimizer=opt,
loss_scale_manager=loss_scale, metrics={'acc'},
amp_level="O2", keep_batchnorm_fp32=False)
# callbacks
loss_cb = LossGet(1, step_size)
# train and eval
acc = 0.0
time_cost = 0.0
print("run_start", device_id)
for epoch_idx in range(0, int(epoch_size / eval_interval)):
model.train(eval_interval, dataset, callbacks=loss_cb)
eval_start = time.time()
output = model.eval(eval_dataset)
eval_cost = (time.time() - eval_start) * 1000
acc = float(output["acc"])
time_cost = loss_cb.get_per_step_time()
loss = loss_cb.get_loss()
print("the {} epoch's resnet result:\n "
"device{}, training loss {}, acc {}, "
"training per step cost {:.2f} ms, eval cost {:.2f} ms, total_cost {:.2f} ms".format(
epoch_idx, device_id, loss, acc, time_cost, eval_cost, time_cost * step_size + eval_cost))
q.put({'acc': acc, 'cost': time_cost})
def test_resnet_thor_imagenet_8p_0():
Feature: Resnet50 thor network
Description: Train and evaluate resnet50 thor network on imagenet dataset
Expectation: accuracy > 0.28, time cost < 25.
context.set_context(enable_graph_kernel=False, enable_sparse=False)
q = Queue()
# resnet50_thor
device_num = 8
epoch_size = 1
enable_hccl = True
process = []
for i in range(device_num):
device_id = i
args=(q, device_id, epoch_size, device_num, enable_hccl)))
cpu_count = os.cpu_count()
each_cpu_count = cpu_count // device_num
for i in range(device_num):
if each_cpu_count > 1:
cpu_start = each_cpu_count * i
cpu_end = each_cpu_count * (i + 1)
process_cpu = [x for x in range(cpu_start, cpu_end)]
pid = process[i].pid
os.sched_setaffinity(pid, set(process_cpu))
print("Waiting for all subprocesses done...")
for i in range(device_num):
thor_acc = 0.0
thor_cost = 0.0
for i in range(device_num):
output = q.get()
thor_acc += output['acc']
thor_cost += output['cost']
thor_acc = thor_acc / device_num
thor_cost = thor_cost / device_num
for i in range(0, device_num):
os.system("rm -rf " + str(i))
print("End training...")
assert thor_acc > 0.25
model.train(2, dataset, callbacks=loss_cb,
sink_size=dataset.get_dataset_size(), dataset_sink_mode=True)
time_cost = loss_cb.get_per_step_time()
loss = loss_cb.get_loss()
epoch_idx = loss_cb.get_epoch()
print("the {} epoch's resnet result:\n "
"device{}, training loss {}, "
"training per step cost {:.2f} ms, total_cost {:.2f} ms".format(epoch_idx, device_id,
loss, time_cost, time_cost * step_size))
q.put({'loss': loss, 'cost': time_cost})
@ -275,7 +217,7 @@ def test_resnet_thor_imagenet_8p_1():
# resnet50_thor
device_num = 8
epoch_size = 1
epoch_size = 2
enable_hccl = True
process = []
for i in range(device_num):
@ -300,19 +242,17 @@ def test_resnet_thor_imagenet_8p_1():
thor_acc = 0.0
thor_loss = 0.0
thor_cost = 0.0
for i in range(device_num):
output = q.get()
thor_acc += output['acc']
thor_loss += output['loss']
thor_cost += output['cost']
thor_acc = thor_acc / device_num
thor_loss = thor_loss / device_num
thor_cost = thor_cost / device_num
for i in range(0, device_num):
os.system("rm -rf " + str(i))
print("End training...")
print('thor acc: ', thor_acc)
print('thor cost: ', thor_cost)
#assert thor_acc > 0.25
#assert thor_cost < 30
assert thor_loss < 7
assert thor_cost < 30
Reference in New Issue