!9495 fix summary st, and support test dataset sink mode

From: @ouwenchang
Reviewed-by: 
Signed-off-by:
This commit is contained in:
mindspore-ci-bot 2020-12-07 10:16:39 +08:00 committed by Gitee
commit ae77170282
2 changed files with 109 additions and 82 deletions

View File

@ -18,88 +18,86 @@ import re
import tempfile
import shutil
from collections import Counter
import pytest
from mindspore import dataset as ds
from mindspore import nn, Tensor, context
from mindspore.nn.metrics import Accuracy
from mindspore.nn.metrics import Loss
from mindspore.nn.optim import Momentum
from mindspore.dataset.transforms import c_transforms as C
from mindspore.dataset.vision import c_transforms as CV
from mindspore.dataset.vision import Inter
from mindspore.common import dtype as mstype
from mindspore.common.initializer import TruncatedNormal
from mindspore.ops import operations as P
from mindspore.common.initializer import Normal
from mindspore.train import Model
from mindspore.train.callback import SummaryCollector
from tests.summary_utils import SummaryReader
def conv(in_channels, out_channels, kernel_size, stride=1, padding=0):
"""weight initial for conv layer"""
weight = weight_variable()
return nn.Conv2d(in_channels, out_channels,
kernel_size=kernel_size, stride=stride, padding=padding,
weight_init=weight, has_bias=False, pad_mode="valid")
def fc_with_initialize(input_channels, out_channels):
"""weight initial for fc layer"""
weight = weight_variable()
bias = weight_variable()
return nn.Dense(input_channels, out_channels, weight, bias)
def weight_variable():
"""weight initial"""
return TruncatedNormal(0.02)
class LeNet5(nn.Cell):
"""Define LeNet5 network."""
def __init__(self, num_class=10, channel=1):
"""
Lenet network
Args:
num_class (int): Number of classes. Default: 10.
num_channel (int): Number of channels. Default: 1.
Returns:
Tensor, output tensor
Examples:
>>> LeNet(num_class=10)
"""
def __init__(self, num_class=10, num_channel=1, include_top=True):
super(LeNet5, self).__init__()
self.num_class = num_class
self.conv1 = conv(channel, 6, 5)
self.conv2 = conv(6, 16, 5)
self.fc1 = fc_with_initialize(16 * 5 * 5, 120)
self.fc2 = fc_with_initialize(120, 84)
self.fc3 = fc_with_initialize(84, self.num_class)
self.conv1 = nn.Conv2d(num_channel, 6, 5, pad_mode='valid')
self.conv2 = nn.Conv2d(6, 16, 5, pad_mode='valid')
self.relu = nn.ReLU()
self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2)
self.flatten = nn.Flatten()
self.include_top = include_top
if self.include_top:
self.flatten = nn.Flatten()
self.fc1 = nn.Dense(16 * 5 * 5, 120, weight_init=Normal(0.02))
self.fc2 = nn.Dense(120, 84, weight_init=Normal(0.02))
self.fc3 = nn.Dense(84, num_class, weight_init=Normal(0.02))
self.scalar_summary = P.ScalarSummary()
self.image_summary = P.ImageSummary()
self.histogram_summary = P.HistogramSummary()
self.tensor_summary = P.TensorSummary()
self.channel = Tensor(channel)
self.channel = Tensor(num_channel)
def construct(self, data):
"""define construct."""
self.image_summary('image', data)
output = self.conv1(data)
self.histogram_summary('histogram', output)
output = self.relu(output)
self.tensor_summary('tensor', output)
output = self.max_pool2d(output)
output = self.conv2(output)
output = self.relu(output)
output = self.max_pool2d(output)
output = self.flatten(output)
output = self.fc1(output)
output = self.relu(output)
output = self.fc2(output)
output = self.relu(output)
output = self.fc3(output)
def construct(self, x):
self.image_summary('image', x)
x = self.conv1(x)
self.histogram_summary('histogram', x)
x = self.relu(x)
self.tensor_summary('tensor', x)
x = self.relu(x)
x = self.max_pool2d(x)
self.scalar_summary('scalar', self.channel)
return output
x = self.conv2(x)
x = self.relu(x)
x = self.max_pool2d(x)
if not self.include_top:
return x
x = self.flatten(x)
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return x
def create_dataset(data_path, batch_size=32, repeat_size=1, num_parallel_workers=1):
def create_dataset(data_path, num_samples=2):
"""create dataset for train or test"""
num_parallel_workers = 1
# define dataset
mnist_ds = ds.MnistDataset(data_path)
mnist_ds = ds.MnistDataset(data_path, num_samples=num_samples)
resize_height, resize_width = 32, 32
rescale = 1.0 / 255.0
@ -122,8 +120,7 @@ def create_dataset(data_path, batch_size=32, repeat_size=1, num_parallel_workers
# apply DatasetOps
mnist_ds = mnist_ds.shuffle(buffer_size=10000) # 10000 as in LeNet train script
mnist_ds = mnist_ds.batch(batch_size, drop_remainder=True)
mnist_ds = mnist_ds.repeat(repeat_size)
mnist_ds = mnist_ds.batch(batch_size=2, drop_remainder=True)
return mnist_ds
@ -136,6 +133,8 @@ class TestSummary:
@classmethod
def setup_class(cls):
"""Run before test this class."""
device_id = int(os.getenv('DEVICE_ID')) if os.getenv('DEVICE_ID') else 0
context.set_context(mode=context.GRAPH_MODE, device_id=device_id)
cls.base_summary_dir = tempfile.mkdtemp(suffix='summary')
@classmethod
@ -144,51 +143,77 @@ class TestSummary:
if os.path.exists(cls.base_summary_dir):
shutil.rmtree(cls.base_summary_dir)
@pytest.mark.level0
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
def test_summary_ascend(self):
"""Test summary ascend."""
context.set_context(mode=context.GRAPH_MODE)
self._run_network()
def _run_network(self, dataset_sink_mode=True):
def _run_network(self, dataset_sink_mode=False, num_samples=2):
lenet = LeNet5()
loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean")
optim = Momentum(lenet.trainable_params(), learning_rate=0.1, momentum=0.9)
model = Model(lenet, loss_fn=loss, optimizer=optim, metrics={'acc': Accuracy()})
model = Model(lenet, loss_fn=loss, optimizer=optim, metrics={'acc': Loss()})
summary_dir = tempfile.mkdtemp(dir=self.base_summary_dir)
summary_collector = SummaryCollector(summary_dir=summary_dir, collect_freq=1)
summary_collector = SummaryCollector(summary_dir=summary_dir, collect_freq=2)
ds_train = create_dataset(os.path.join(self.mnist_path, "train"))
ds_train = create_dataset(os.path.join(self.mnist_path, "train"), num_samples=num_samples)
model.train(1, ds_train, callbacks=[summary_collector], dataset_sink_mode=dataset_sink_mode)
ds_eval = create_dataset(os.path.join(self.mnist_path, "test"))
model.eval(ds_eval, dataset_sink_mode=dataset_sink_mode, callbacks=[summary_collector])
return summary_dir
self._check_summary_result(summary_dir)
@pytest.mark.level0
@pytest.mark.platform_x86_ascend_training
@pytest.mark.platform_arm_ascend_training
@pytest.mark.env_onecard
def test_summary_with_sink_mode_false(self):
"""Test summary with sink mode false, and num samples is 64."""
summary_dir = self._run_network(num_samples=10)
tag_list = self._list_summary_tags(summary_dir)
expected_tag_set = {'conv1.weight/auto', 'conv2.weight/auto', 'fc1.weight/auto', 'fc1.bias/auto',
'fc2.weight/auto', 'input_data/auto', 'loss/auto',
'histogram', 'image', 'scalar', 'tensor'}
assert set(expected_tag_set) == set(tag_list)
# num samples is 10, batch size is 2, so step is 5, collect freq is 2,
# SummaryCollector will collect the first step and 2th, 4th step
tag_count = 3
for value in Counter(tag_list).values():
assert value == tag_count
@pytest.mark.level0
@pytest.mark.platform_x86_ascend_training
@pytest.mark.platform_arm_ascend_training
@pytest.mark.env_onecard
def test_summary_with_sink_mode_true(self):
"""Test summary with sink mode true, and num samples is 64."""
summary_dir = self._run_network(dataset_sink_mode=True, num_samples=10)
tag_list = self._list_summary_tags(summary_dir)
# There will not record input data when dataset sink mode is True
expected_tags = {'conv1.weight/auto', 'conv2.weight/auto', 'fc1.weight/auto', 'fc1.bias/auto',
'fc2.weight/auto', 'loss/auto', 'histogram', 'image', 'scalar', 'tensor'}
assert set(expected_tags) == set(tag_list)
tag_count = 1
for value in Counter(tag_list).values():
assert value == tag_count
@staticmethod
def _check_summary_result(summary_dir):
def _list_summary_tags(summary_dir):
summary_file_path = ''
for file in os.listdir(summary_dir):
if re.search("_MS", file):
summary_file_path = os.path.join(summary_dir, file)
break
assert summary_file_path
assert not summary_file_path
tags = list()
with SummaryReader(summary_file_path) as summary_reader:
tags = set()
# Read the event that record by SummaryCollector.begin
summary_reader.read_event()
summary_event = summary_reader.read_event()
for value in summary_event.summary.value:
tags.add(value.tag)
# There will not record input data when dataset sink mode is True
expected_tags = ['conv1.weight/auto', 'conv2.weight/auto', 'fc1.weight/auto', 'fc1.bias/auto',
'fc2.weight/auto', 'histogram', 'image', 'scalar', 'tensor']
assert set(expected_tags) == tags
while True:
summary_event = summary_reader.read_event()
if not summary_event:
break
for value in summary_event.summary.value:
tags.append(value.tag)
return tags

View File

@ -54,6 +54,8 @@ class SummaryReader:
"""Read next event."""
file_handler = self._file_handler
header = file_handler.read(_HEADER_SIZE)
if not header:
return None
data_len = struct.unpack('Q', header)[0]
# Ignore crc check.
file_handler.read(_HEADER_CRC_SIZE)