forked from mindspore-Ecosystem/mindspore
370 lines
14 KiB
Python
370 lines
14 KiB
Python
# Copyright 2019 Huawei Technologies Co., Ltd
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ==============================================================================
|
|
"""
|
|
Testing configuration manager
|
|
"""
|
|
import os
|
|
import filecmp
|
|
import glob
|
|
import numpy as np
|
|
|
|
import mindspore.dataset as ds
|
|
import mindspore.dataset.transforms.py_transforms
|
|
import mindspore.dataset.vision.c_transforms as c_vision
|
|
import mindspore.dataset.vision.py_transforms as py_vision
|
|
from mindspore import log as logger
|
|
from util import dataset_equal
|
|
|
|
DATA_DIR = ["../data/dataset/test_tf_file_3_images/train-0000-of-0001.data"]
|
|
SCHEMA_DIR = "../data/dataset/test_tf_file_3_images/datasetSchema.json"
|
|
|
|
|
|
def test_basic():
|
|
"""
|
|
Test basic configuration functions
|
|
"""
|
|
# Save original configuration values
|
|
num_parallel_workers_original = ds.config.get_num_parallel_workers()
|
|
prefetch_size_original = ds.config.get_prefetch_size()
|
|
seed_original = ds.config.get_seed()
|
|
monitor_sampling_interval_original = ds.config.get_monitor_sampling_interval()
|
|
|
|
ds.config.load('../data/dataset/declient.cfg')
|
|
|
|
# assert ds.config.get_rows_per_buffer() == 32
|
|
assert ds.config.get_num_parallel_workers() == 4
|
|
# assert ds.config.get_worker_connector_size() == 16
|
|
assert ds.config.get_prefetch_size() == 16
|
|
assert ds.config.get_seed() == 5489
|
|
assert ds.config.get_monitor_sampling_interval() == 15
|
|
|
|
# ds.config.set_rows_per_buffer(1)
|
|
ds.config.set_num_parallel_workers(2)
|
|
# ds.config.set_worker_connector_size(3)
|
|
ds.config.set_prefetch_size(4)
|
|
ds.config.set_seed(5)
|
|
ds.config.set_monitor_sampling_interval(45)
|
|
|
|
# assert ds.config.get_rows_per_buffer() == 1
|
|
assert ds.config.get_num_parallel_workers() == 2
|
|
# assert ds.config.get_worker_connector_size() == 3
|
|
assert ds.config.get_prefetch_size() == 4
|
|
assert ds.config.get_seed() == 5
|
|
assert ds.config.get_monitor_sampling_interval() == 45
|
|
|
|
# Restore original configuration values
|
|
ds.config.set_num_parallel_workers(num_parallel_workers_original)
|
|
ds.config.set_prefetch_size(prefetch_size_original)
|
|
ds.config.set_seed(seed_original)
|
|
ds.config.set_monitor_sampling_interval(monitor_sampling_interval_original)
|
|
|
|
|
|
def test_get_seed():
|
|
"""
|
|
This gets the seed value without explicitly setting a default, expect int.
|
|
"""
|
|
assert isinstance(ds.config.get_seed(), int)
|
|
|
|
|
|
def test_pipeline():
|
|
"""
|
|
Test that our configuration pipeline works when we set parameters at different locations in dataset code
|
|
"""
|
|
# Save original configuration values
|
|
num_parallel_workers_original = ds.config.get_num_parallel_workers()
|
|
|
|
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
|
|
data1 = data1.map(operations=[c_vision.Decode(True)], input_columns=["image"])
|
|
ds.serialize(data1, "testpipeline.json")
|
|
|
|
data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, num_parallel_workers=num_parallel_workers_original,
|
|
shuffle=False)
|
|
data2 = data2.map(operations=[c_vision.Decode(True)], input_columns=["image"])
|
|
ds.serialize(data2, "testpipeline2.json")
|
|
|
|
# check that the generated output is different
|
|
assert filecmp.cmp('testpipeline.json', 'testpipeline2.json')
|
|
|
|
# this test passes currently because our num_parallel_workers don't get updated.
|
|
|
|
# remove generated jason files
|
|
file_list = glob.glob('*.json')
|
|
for f in file_list:
|
|
try:
|
|
os.remove(f)
|
|
except IOError:
|
|
logger.info("Error while deleting: {}".format(f))
|
|
|
|
# Restore original configuration values
|
|
ds.config.set_num_parallel_workers(num_parallel_workers_original)
|
|
|
|
|
|
def test_deterministic_run_fail():
|
|
"""
|
|
Test RandomCrop with seed, expected to fail
|
|
"""
|
|
logger.info("test_deterministic_run_fail")
|
|
|
|
# Save original configuration values
|
|
num_parallel_workers_original = ds.config.get_num_parallel_workers()
|
|
seed_original = ds.config.get_seed()
|
|
|
|
# when we set the seed all operations within our dataset should be deterministic
|
|
ds.config.set_seed(0)
|
|
ds.config.set_num_parallel_workers(1)
|
|
# First dataset
|
|
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
|
|
# Assuming we get the same seed on calling constructor, if this op is re-used then result won't be
|
|
# the same in between the two datasets. For example, RandomCrop constructor takes seed (0)
|
|
# outputs a deterministic series of numbers, e,g "a" = [1, 2, 3, 4, 5, 6] <- pretend these are random
|
|
random_crop_op = c_vision.RandomCrop([512, 512], [200, 200, 200, 200])
|
|
decode_op = c_vision.Decode()
|
|
data1 = data1.map(operations=decode_op, input_columns=["image"])
|
|
data1 = data1.map(operations=random_crop_op, input_columns=["image"])
|
|
|
|
# Second dataset
|
|
data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
|
|
data2 = data2.map(operations=decode_op, input_columns=["image"])
|
|
# If seed is set up on constructor
|
|
data2 = data2.map(operations=random_crop_op, input_columns=["image"])
|
|
|
|
try:
|
|
dataset_equal(data1, data2, 0)
|
|
|
|
except Exception as e:
|
|
# two datasets split the number out of the sequence a
|
|
logger.info("Got an exception in DE: {}".format(str(e)))
|
|
assert "Array" in str(e)
|
|
|
|
# Restore original configuration values
|
|
ds.config.set_num_parallel_workers(num_parallel_workers_original)
|
|
ds.config.set_seed(seed_original)
|
|
|
|
|
|
def test_seed_undeterministic():
|
|
"""
|
|
Test seed with num parallel workers in c, this test is expected to fail some of the time
|
|
"""
|
|
logger.info("test_seed_undeterministic")
|
|
|
|
# Save original configuration values
|
|
num_parallel_workers_original = ds.config.get_num_parallel_workers()
|
|
seed_original = ds.config.get_seed()
|
|
|
|
ds.config.set_seed(0)
|
|
ds.config.set_num_parallel_workers(3)
|
|
|
|
# First dataset
|
|
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
|
|
# We get the seed when constructor is called
|
|
random_crop_op = c_vision.RandomCrop([512, 512], [200, 200, 200, 200])
|
|
decode_op = c_vision.Decode()
|
|
data1 = data1.map(operations=decode_op, input_columns=["image"])
|
|
data1 = data1.map(operations=random_crop_op, input_columns=["image"])
|
|
|
|
# Second dataset
|
|
data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
|
|
data2 = data2.map(operations=decode_op, input_columns=["image"])
|
|
# Since seed is set up on constructor, so the two ops output deterministic sequence.
|
|
# Assume the generated random sequence "a" = [1, 2, 3, 4, 5, 6] <- pretend these are random
|
|
random_crop_op2 = c_vision.RandomCrop([512, 512], [200, 200, 200, 200])
|
|
data2 = data2.map(operations=random_crop_op2, input_columns=["image"])
|
|
try:
|
|
dataset_equal(data1, data2, 0)
|
|
except Exception as e:
|
|
# two datasets both use numbers from the generated sequence "a"
|
|
logger.info("Got an exception in DE: {}".format(str(e)))
|
|
assert "Array" in str(e)
|
|
|
|
# Restore original configuration values
|
|
ds.config.set_num_parallel_workers(num_parallel_workers_original)
|
|
ds.config.set_seed(seed_original)
|
|
|
|
|
|
def test_seed_deterministic():
|
|
"""
|
|
Test deterministic run with setting the seed, only works with num_parallel worker = 1
|
|
"""
|
|
logger.info("test_seed_deterministic")
|
|
|
|
# Save original configuration values
|
|
num_parallel_workers_original = ds.config.get_num_parallel_workers()
|
|
seed_original = ds.config.get_seed()
|
|
|
|
ds.config.set_seed(0)
|
|
ds.config.set_num_parallel_workers(1)
|
|
|
|
# First dataset
|
|
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
|
|
# seed will be read in during constructor call
|
|
random_crop_op = c_vision.RandomCrop([512, 512], [200, 200, 200, 200])
|
|
decode_op = c_vision.Decode()
|
|
data1 = data1.map(operations=decode_op, input_columns=["image"])
|
|
data1 = data1.map(operations=random_crop_op, input_columns=["image"])
|
|
|
|
# Second dataset
|
|
data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
|
|
data2 = data2.map(operations=decode_op, input_columns=["image"])
|
|
# If seed is set up on constructor, so the two ops output deterministic sequence
|
|
random_crop_op2 = c_vision.RandomCrop([512, 512], [200, 200, 200, 200])
|
|
data2 = data2.map(operations=random_crop_op2, input_columns=["image"])
|
|
|
|
dataset_equal(data1, data2, 0)
|
|
|
|
# Restore original configuration values
|
|
ds.config.set_num_parallel_workers(num_parallel_workers_original)
|
|
ds.config.set_seed(seed_original)
|
|
|
|
|
|
def test_deterministic_run_distribution():
|
|
"""
|
|
Test deterministic run with with setting the seed being used in a distribution
|
|
"""
|
|
logger.info("test_deterministic_run_distribution")
|
|
|
|
# Save original configuration values
|
|
num_parallel_workers_original = ds.config.get_num_parallel_workers()
|
|
seed_original = ds.config.get_seed()
|
|
|
|
# when we set the seed all operations within our dataset should be deterministic
|
|
ds.config.set_seed(0)
|
|
ds.config.set_num_parallel_workers(1)
|
|
|
|
# First dataset
|
|
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
|
|
random_horizontal_flip_op = c_vision.RandomHorizontalFlip(0.1)
|
|
decode_op = c_vision.Decode()
|
|
data1 = data1.map(operations=decode_op, input_columns=["image"])
|
|
data1 = data1.map(operations=random_horizontal_flip_op, input_columns=["image"])
|
|
|
|
# Second dataset
|
|
data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
|
|
data2 = data2.map(operations=decode_op, input_columns=["image"])
|
|
# If seed is set up on constructor, so the two ops output deterministic sequence
|
|
random_horizontal_flip_op2 = c_vision.RandomHorizontalFlip(0.1)
|
|
data2 = data2.map(operations=random_horizontal_flip_op2, input_columns=["image"])
|
|
|
|
dataset_equal(data1, data2, 0)
|
|
|
|
# Restore original configuration values
|
|
ds.config.set_num_parallel_workers(num_parallel_workers_original)
|
|
ds.config.set_seed(seed_original)
|
|
|
|
|
|
def test_deterministic_python_seed():
|
|
"""
|
|
Test deterministic execution with seed in python
|
|
"""
|
|
logger.info("test_deterministic_python_seed")
|
|
|
|
# Save original configuration values
|
|
num_parallel_workers_original = ds.config.get_num_parallel_workers()
|
|
seed_original = ds.config.get_seed()
|
|
|
|
ds.config.set_seed(0)
|
|
ds.config.set_num_parallel_workers(1)
|
|
|
|
# First dataset
|
|
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
|
|
|
|
transforms = [
|
|
py_vision.Decode(),
|
|
py_vision.RandomCrop([512, 512], [200, 200, 200, 200]),
|
|
py_vision.ToTensor(),
|
|
]
|
|
transform = mindspore.dataset.transforms.py_transforms.Compose(transforms)
|
|
data1 = data1.map(operations=transform, input_columns=["image"])
|
|
data1_output = []
|
|
# config.set_seed() calls random.seed()
|
|
for data_one in data1.create_dict_iterator(num_epochs=1, output_numpy=True):
|
|
data1_output.append(data_one["image"])
|
|
|
|
# Second dataset
|
|
data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
|
|
data2 = data2.map(operations=transform, input_columns=["image"])
|
|
# config.set_seed() calls random.seed(), resets seed for next dataset iterator
|
|
ds.config.set_seed(0)
|
|
|
|
data2_output = []
|
|
for data_two in data2.create_dict_iterator(num_epochs=1, output_numpy=True):
|
|
data2_output.append(data_two["image"])
|
|
|
|
np.testing.assert_equal(data1_output, data2_output)
|
|
|
|
# Restore original configuration values
|
|
ds.config.set_num_parallel_workers(num_parallel_workers_original)
|
|
ds.config.set_seed(seed_original)
|
|
|
|
|
|
def test_deterministic_python_seed_multi_thread():
|
|
"""
|
|
Test deterministic execution with seed in python, this fails with multi-thread pyfunc run
|
|
"""
|
|
logger.info("test_deterministic_python_seed_multi_thread")
|
|
|
|
# Save original configuration values
|
|
num_parallel_workers_original = ds.config.get_num_parallel_workers()
|
|
seed_original = ds.config.get_seed()
|
|
ds.config.set_num_parallel_workers(3)
|
|
ds.config.set_seed(0)
|
|
# when we set the seed all operations within our dataset should be deterministic
|
|
# First dataset
|
|
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
|
|
transforms = [
|
|
py_vision.Decode(),
|
|
py_vision.RandomCrop([512, 512], [200, 200, 200, 200]),
|
|
py_vision.ToTensor(),
|
|
]
|
|
transform = mindspore.dataset.transforms.py_transforms.Compose(transforms)
|
|
data1 = data1.map(operations=transform, input_columns=["image"], python_multiprocessing=True)
|
|
data1_output = []
|
|
# config.set_seed() calls random.seed()
|
|
for data_one in data1.create_dict_iterator(num_epochs=1, output_numpy=True):
|
|
data1_output.append(data_one["image"])
|
|
|
|
# Second dataset
|
|
data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
|
|
# If seed is set up on constructor
|
|
data2 = data2.map(operations=transform, input_columns=["image"], python_multiprocessing=True)
|
|
# config.set_seed() calls random.seed()
|
|
ds.config.set_seed(0)
|
|
|
|
data2_output = []
|
|
for data_two in data2.create_dict_iterator(num_epochs=1, output_numpy=True):
|
|
data2_output.append(data_two["image"])
|
|
|
|
try:
|
|
np.testing.assert_equal(data1_output, data2_output)
|
|
except Exception as e:
|
|
# expect output to not match during multi-threaded excution
|
|
logger.info("Got an exception in DE: {}".format(str(e)))
|
|
assert "Array" in str(e)
|
|
|
|
# Restore original configuration values
|
|
ds.config.set_num_parallel_workers(num_parallel_workers_original)
|
|
ds.config.set_seed(seed_original)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
test_basic()
|
|
test_get_seed()
|
|
test_pipeline()
|
|
test_deterministic_run_fail()
|
|
test_seed_undeterministic()
|
|
test_seed_deterministic()
|
|
test_deterministic_run_distribution()
|
|
test_deterministic_python_seed()
|
|
test_deterministic_python_seed_multi_thread()
|