forked from mindspore-Ecosystem/mindspore
!32634 add testcase: when return numpy in GeneratorDataset
Merge pull request !32634 from guozhijian/add_testcase
This commit is contained in:
commit
dff6b9b0fc
|
@ -1367,6 +1367,652 @@ def test_generator_single_input_6():
|
|||
assert_generator_single_input_6(SequentialAccessDatasetInner())
|
||||
|
||||
|
||||
def test_generator_with_single_numpy():
|
||||
"""
|
||||
Feature: Test GeneratorDataset with single numpy and multi columns when use __getitem__
|
||||
Description: single numpy, tuple numpy with single columns and multi columns
|
||||
Expectation: success
|
||||
"""
|
||||
class get_dataset_generator:
|
||||
def __init__(self, value):
|
||||
np.random.seed(58)
|
||||
self.__value = value
|
||||
|
||||
def __getitem__(self, index):
|
||||
return self.__value
|
||||
|
||||
def __len__(self):
|
||||
return 20
|
||||
|
||||
def test_generator_one_column(value):
|
||||
number = np.random.randint(1, 4)
|
||||
process_flag = False
|
||||
if number > 1 and number % 2 == 0:
|
||||
process_flag = True
|
||||
dataset_generator = get_dataset_generator(value)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False, num_parallel_workers=number,
|
||||
python_multiprocessing=process_flag)
|
||||
count = 0
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
assert (data["data"] == value).all()
|
||||
count += 1
|
||||
assert count == 20
|
||||
|
||||
# test user define one column
|
||||
numpy_1 = np.array(1)
|
||||
numpy_2 = np.array([1])
|
||||
numpy_3 = np.array([1, 2])
|
||||
numpy_4 = np.array([1, 2, 3])
|
||||
numpy_5 = np.array([[1], [2]])
|
||||
numpy_6 = np.array([[1, 2], [2, 3]])
|
||||
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
|
||||
numpy_8 = np.array([[1], [2], [3]])
|
||||
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
|
||||
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
|
||||
test_generator_one_column(numpy_1)
|
||||
test_generator_one_column(numpy_2)
|
||||
test_generator_one_column(numpy_3)
|
||||
test_generator_one_column(numpy_4)
|
||||
test_generator_one_column(numpy_5)
|
||||
test_generator_one_column(numpy_6)
|
||||
test_generator_one_column(numpy_7)
|
||||
test_generator_one_column(numpy_8)
|
||||
test_generator_one_column(numpy_9)
|
||||
test_generator_one_column(numpy_10)
|
||||
|
||||
tuple_1 = (numpy_7,)
|
||||
dataset_generator = get_dataset_generator(tuple_1)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
|
||||
count = 0
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
assert (data["data"] == tuple_1[0]).all()
|
||||
count += 1
|
||||
assert count == 20
|
||||
|
||||
tuple_2 = (numpy_6, numpy_7)
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
dataset_generator = get_dataset_generator(tuple_2)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data["data"])
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:1 and number of returned NumPy array is:2" in str(info.value)
|
||||
|
||||
tuple_4 = (numpy_4, numpy_5, numpy_6, numpy_7)
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
dataset_generator = get_dataset_generator(tuple_4)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data["data"])
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:1 and number of returned NumPy array is:4" in str(info.value)
|
||||
|
||||
# test user define two column
|
||||
def test_generator_two_column(value):
|
||||
number = np.random.randint(1, 4)
|
||||
process_flag = False
|
||||
if number > 1 and number % 2 == 0:
|
||||
process_flag = True
|
||||
dataset_generator = get_dataset_generator(value)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False, num_parallel_workers=number,
|
||||
python_multiprocessing=process_flag)
|
||||
count = 0
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data)
|
||||
count += 1
|
||||
assert count == 20
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:2 and number of returned NumPy array is:1" in str(info.value)
|
||||
|
||||
numpy_1 = np.array(1)
|
||||
numpy_2 = np.array([1])
|
||||
numpy_3 = np.array([1, 2])
|
||||
numpy_4 = np.array([1, 2, 3])
|
||||
numpy_5 = np.array([[1], [2]])
|
||||
numpy_6 = np.array([[1, 2], [2, 3]])
|
||||
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
|
||||
numpy_8 = np.array([[1], [2], [3]])
|
||||
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
|
||||
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
|
||||
test_generator_two_column(numpy_1)
|
||||
test_generator_two_column(numpy_2)
|
||||
test_generator_two_column(numpy_3)
|
||||
test_generator_two_column(numpy_4)
|
||||
test_generator_two_column(numpy_5)
|
||||
test_generator_two_column(numpy_6)
|
||||
test_generator_two_column(numpy_7)
|
||||
test_generator_two_column(numpy_8)
|
||||
test_generator_two_column(numpy_9)
|
||||
test_generator_two_column(numpy_10)
|
||||
tuple_1 = (numpy_7,)
|
||||
test_generator_two_column(tuple_1)
|
||||
|
||||
tuple_2 = (numpy_2, numpy_3)
|
||||
dataset_generator = get_dataset_generator(tuple_2)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False)
|
||||
count = 0
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
assert (data["data"] == numpy_2).all()
|
||||
assert (data["label"] == numpy_3).all()
|
||||
count += 1
|
||||
assert count == 20
|
||||
|
||||
tuple_3 = (numpy_4, numpy_5, numpy_6)
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
dataset_generator = get_dataset_generator(tuple_3)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False)
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data["data"])
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:2 and number of returned NumPy array is:3" in str(info.value)
|
||||
|
||||
# test user define three column
|
||||
def test_generator_three_column(value):
|
||||
number = np.random.randint(1, 4)
|
||||
process_flag = False
|
||||
if number > 1 and number % 2 == 0:
|
||||
process_flag = True
|
||||
dataset_generator = get_dataset_generator(value)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False,
|
||||
num_parallel_workers=number, python_multiprocessing=process_flag)
|
||||
count = 0
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data)
|
||||
count += 1
|
||||
assert count == 20
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:3 and number of returned NumPy array is:1" in str(info.value)
|
||||
|
||||
numpy_1 = np.array(1)
|
||||
numpy_2 = np.array([1])
|
||||
numpy_3 = np.array([1, 2])
|
||||
numpy_4 = np.array([1, 2, 3])
|
||||
numpy_5 = np.array([[1], [2]])
|
||||
numpy_6 = np.array([[1, 2], [2, 3]])
|
||||
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
|
||||
numpy_8 = np.array([[1], [2], [3]])
|
||||
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
|
||||
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
|
||||
test_generator_three_column(numpy_1)
|
||||
test_generator_three_column(numpy_2)
|
||||
test_generator_three_column(numpy_3)
|
||||
test_generator_three_column(numpy_4)
|
||||
test_generator_three_column(numpy_5)
|
||||
test_generator_three_column(numpy_6)
|
||||
test_generator_three_column(numpy_7)
|
||||
test_generator_three_column(numpy_8)
|
||||
test_generator_three_column(numpy_9)
|
||||
test_generator_three_column(numpy_10)
|
||||
tuple_1 = (numpy_7,)
|
||||
test_generator_three_column(tuple_1)
|
||||
|
||||
tuple_2 = (numpy_2, numpy_3)
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
dataset_generator = get_dataset_generator(tuple_2)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False)
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data["data"])
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:3 and number of returned NumPy array is:2" in str(info.value)
|
||||
|
||||
tuple_3 = (numpy_4, numpy_5, numpy_6)
|
||||
dataset_generator = get_dataset_generator(tuple_3)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False)
|
||||
count = 0
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
assert (data["data"] == numpy_4).all()
|
||||
assert (data["label"] == numpy_5).all()
|
||||
assert (data["label2"] == numpy_6).all()
|
||||
count += 1
|
||||
assert count == 20
|
||||
|
||||
|
||||
def test_generator_with_single_numpy_with_next():
|
||||
"""
|
||||
Feature: Test GeneratorDataset with single numpy and multi columns when use __next__
|
||||
Description: single numpy, tuple numpy with single columns and multi columns
|
||||
Expectation: success
|
||||
"""
|
||||
class get_dataset_generator:
|
||||
def __init__(self, value):
|
||||
np.random.seed(58)
|
||||
self.__value = value
|
||||
self.__index = 0
|
||||
|
||||
def __next__(self):
|
||||
if self.__index >= 20:
|
||||
raise StopIteration
|
||||
|
||||
self.__index += 1
|
||||
return self.__value
|
||||
|
||||
def __iter__(self):
|
||||
self.__index = 0
|
||||
return self
|
||||
|
||||
def __len__(self):
|
||||
return 20
|
||||
|
||||
def test_generator_one_column(value):
|
||||
number = np.random.randint(1, 4)
|
||||
process_flag = False
|
||||
if number > 1 and number % 2 == 0:
|
||||
process_flag = True
|
||||
dataset_generator = get_dataset_generator(value)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False, num_parallel_workers=number,
|
||||
python_multiprocessing=process_flag)
|
||||
count = 0
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
assert (data["data"] == value).all()
|
||||
count += 1
|
||||
assert count == 20
|
||||
|
||||
# test user define one column
|
||||
numpy_1 = np.array(1)
|
||||
numpy_2 = np.array([1])
|
||||
numpy_3 = np.array([1, 2])
|
||||
numpy_4 = np.array([1, 2, 3])
|
||||
numpy_5 = np.array([[1], [2]])
|
||||
numpy_6 = np.array([[1, 2], [2, 3]])
|
||||
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
|
||||
numpy_8 = np.array([[1], [2], [3]])
|
||||
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
|
||||
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
|
||||
test_generator_one_column(numpy_1)
|
||||
test_generator_one_column(numpy_2)
|
||||
test_generator_one_column(numpy_3)
|
||||
test_generator_one_column(numpy_4)
|
||||
test_generator_one_column(numpy_5)
|
||||
test_generator_one_column(numpy_6)
|
||||
test_generator_one_column(numpy_7)
|
||||
test_generator_one_column(numpy_8)
|
||||
test_generator_one_column(numpy_9)
|
||||
test_generator_one_column(numpy_10)
|
||||
|
||||
tuple_1 = (numpy_7,)
|
||||
dataset_generator = get_dataset_generator(tuple_1)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
|
||||
count = 0
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
assert (data["data"] == tuple_1[0]).all()
|
||||
count += 1
|
||||
assert count == 20
|
||||
|
||||
tuple_2 = (numpy_6, numpy_7)
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
dataset_generator = get_dataset_generator(tuple_2)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data["data"])
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:1 and number of returned NumPy array is:2" in str(info.value)
|
||||
|
||||
tuple_3 = (numpy_1, numpy_2)
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
dataset_generator = get_dataset_generator(tuple_3)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data["data"])
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:1 and number of returned NumPy array is:2" in str(info.value)
|
||||
|
||||
tuple_4 = (numpy_4, numpy_5, numpy_6, numpy_7)
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
dataset_generator = get_dataset_generator(tuple_4)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data["data"])
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:1 and number of returned NumPy array is:4" in str(info.value)
|
||||
|
||||
# test user define two column
|
||||
def test_generator_two_column(value):
|
||||
number = np.random.randint(1, 4)
|
||||
process_flag = False
|
||||
if number > 1 and number % 2 == 0:
|
||||
process_flag = True
|
||||
dataset_generator = get_dataset_generator(value)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False, num_parallel_workers=number,
|
||||
python_multiprocessing=process_flag)
|
||||
count = 0
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data)
|
||||
count += 1
|
||||
assert count == 20
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:2 and number of returned NumPy array is:1" in str(info.value)
|
||||
|
||||
numpy_1 = np.array(1)
|
||||
numpy_2 = np.array([1])
|
||||
numpy_3 = np.array([1, 2])
|
||||
numpy_4 = np.array([1, 2, 3])
|
||||
numpy_5 = np.array([[1], [2]])
|
||||
numpy_6 = np.array([[1, 2], [2, 3]])
|
||||
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
|
||||
numpy_8 = np.array([[1], [2], [3]])
|
||||
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
|
||||
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
|
||||
test_generator_two_column(numpy_1)
|
||||
test_generator_two_column(numpy_2)
|
||||
test_generator_two_column(numpy_3)
|
||||
test_generator_two_column(numpy_4)
|
||||
test_generator_two_column(numpy_5)
|
||||
test_generator_two_column(numpy_6)
|
||||
test_generator_two_column(numpy_7)
|
||||
test_generator_two_column(numpy_8)
|
||||
test_generator_two_column(numpy_9)
|
||||
test_generator_two_column(numpy_10)
|
||||
tuple_1 = (numpy_7,)
|
||||
test_generator_two_column(tuple_1)
|
||||
|
||||
tuple_2 = (numpy_2, numpy_3)
|
||||
dataset_generator = get_dataset_generator(tuple_2)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False)
|
||||
count = 0
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
assert (data["data"] == numpy_2).all()
|
||||
assert (data["label"] == numpy_3).all()
|
||||
count += 1
|
||||
assert count == 20
|
||||
|
||||
tuple_3 = (numpy_4, numpy_5, numpy_6)
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
dataset_generator = get_dataset_generator(tuple_3)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False)
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data["data"])
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:2 and number of returned NumPy array is:3" in str(info.value)
|
||||
|
||||
# test user define three column
|
||||
def test_generator_three_column(value):
|
||||
number = np.random.randint(1, 4)
|
||||
process_flag = False
|
||||
if number > 1 and number % 2 == 0:
|
||||
process_flag = True
|
||||
dataset_generator = get_dataset_generator(value)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False,
|
||||
num_parallel_workers=number, python_multiprocessing=process_flag)
|
||||
count = 0
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data)
|
||||
count += 1
|
||||
assert count == 20
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:3 and number of returned NumPy array is:1" in str(info.value)
|
||||
|
||||
numpy_1 = np.array(1)
|
||||
numpy_2 = np.array([1])
|
||||
numpy_3 = np.array([1, 2])
|
||||
numpy_4 = np.array([1, 2, 3])
|
||||
numpy_5 = np.array([[1], [2]])
|
||||
numpy_6 = np.array([[1, 2], [2, 3]])
|
||||
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
|
||||
numpy_8 = np.array([[1], [2], [3]])
|
||||
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
|
||||
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
|
||||
test_generator_three_column(numpy_1)
|
||||
test_generator_three_column(numpy_2)
|
||||
test_generator_three_column(numpy_3)
|
||||
test_generator_three_column(numpy_4)
|
||||
test_generator_three_column(numpy_5)
|
||||
test_generator_three_column(numpy_6)
|
||||
test_generator_three_column(numpy_7)
|
||||
test_generator_three_column(numpy_8)
|
||||
test_generator_three_column(numpy_9)
|
||||
test_generator_three_column(numpy_10)
|
||||
tuple_1 = (numpy_7,)
|
||||
test_generator_three_column(tuple_1)
|
||||
|
||||
tuple_2 = (numpy_2, numpy_3)
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
dataset_generator = get_dataset_generator(tuple_2)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False)
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data["data"])
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:3 and number of returned NumPy array is:2" in str(info.value)
|
||||
|
||||
tuple_3 = (numpy_4, numpy_5, numpy_6)
|
||||
dataset_generator = get_dataset_generator(tuple_3)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False)
|
||||
count = 0
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
assert (data["data"] == numpy_4).all()
|
||||
assert (data["label"] == numpy_5).all()
|
||||
assert (data["label2"] == numpy_6).all()
|
||||
count += 1
|
||||
assert count == 20
|
||||
|
||||
|
||||
def test_generator_with_single_numpy_with_yield():
|
||||
"""
|
||||
Feature: Test GeneratorDataset with single numpy and multi columns when use yield
|
||||
Description: single numpy, tuple numpy with single columns and multi columns
|
||||
Expectation: success
|
||||
"""
|
||||
def get_dataset_generator(value):
|
||||
for _ in range(20):
|
||||
yield value
|
||||
|
||||
def test_generator_one_column(value):
|
||||
number = np.random.randint(1, 4)
|
||||
process_flag = False
|
||||
if number > 1 and number % 2 == 0:
|
||||
process_flag = True
|
||||
dataset_generator = get_dataset_generator(value)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False, num_parallel_workers=number,
|
||||
python_multiprocessing=process_flag)
|
||||
count = 0
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
assert (data["data"] == value).all()
|
||||
count += 1
|
||||
assert count == 20
|
||||
|
||||
# test user define one column
|
||||
numpy_1 = np.array(1)
|
||||
numpy_2 = np.array([1])
|
||||
numpy_3 = np.array([1, 2])
|
||||
numpy_4 = np.array([1, 2, 3])
|
||||
numpy_5 = np.array([[1], [2]])
|
||||
numpy_6 = np.array([[1, 2], [2, 3]])
|
||||
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
|
||||
numpy_8 = np.array([[1], [2], [3]])
|
||||
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
|
||||
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
|
||||
test_generator_one_column(numpy_1)
|
||||
test_generator_one_column(numpy_2)
|
||||
test_generator_one_column(numpy_3)
|
||||
test_generator_one_column(numpy_4)
|
||||
test_generator_one_column(numpy_5)
|
||||
test_generator_one_column(numpy_6)
|
||||
test_generator_one_column(numpy_7)
|
||||
test_generator_one_column(numpy_8)
|
||||
test_generator_one_column(numpy_9)
|
||||
test_generator_one_column(numpy_10)
|
||||
|
||||
tuple_1 = (numpy_7,)
|
||||
dataset_generator = get_dataset_generator(tuple_1)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
|
||||
count = 0
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
assert (data["data"] == tuple_1[0]).all()
|
||||
count += 1
|
||||
assert count == 20
|
||||
|
||||
tuple_2 = (numpy_6, numpy_7)
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
dataset_generator = get_dataset_generator(tuple_2)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data["data"])
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:1 and number of returned NumPy array is:2" in str(info.value)
|
||||
|
||||
tuple_3 = (numpy_1, numpy_2)
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
dataset_generator = get_dataset_generator(tuple_3)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data["data"])
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:1 and number of returned NumPy array is:2" in str(info.value)
|
||||
|
||||
tuple_4 = (numpy_4, numpy_5, numpy_6, numpy_7)
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
dataset_generator = get_dataset_generator(tuple_4)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data["data"])
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:1 and number of returned NumPy array is:4" in str(info.value)
|
||||
|
||||
# test user define two column
|
||||
def test_generator_two_column(value):
|
||||
number = np.random.randint(1, 4)
|
||||
process_flag = False
|
||||
if number > 1 and number % 2 == 0:
|
||||
process_flag = True
|
||||
dataset_generator = get_dataset_generator(value)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False, num_parallel_workers=number,
|
||||
python_multiprocessing=process_flag)
|
||||
count = 0
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data)
|
||||
count += 1
|
||||
assert count == 20
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:2 and number of returned NumPy array is:1" in str(info.value)
|
||||
|
||||
numpy_1 = np.array(1)
|
||||
numpy_2 = np.array([1])
|
||||
numpy_3 = np.array([1, 2])
|
||||
numpy_4 = np.array([1, 2, 3])
|
||||
numpy_5 = np.array([[1], [2]])
|
||||
numpy_6 = np.array([[1, 2], [2, 3]])
|
||||
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
|
||||
numpy_8 = np.array([[1], [2], [3]])
|
||||
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
|
||||
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
|
||||
test_generator_two_column(numpy_1)
|
||||
test_generator_two_column(numpy_2)
|
||||
test_generator_two_column(numpy_3)
|
||||
test_generator_two_column(numpy_4)
|
||||
test_generator_two_column(numpy_5)
|
||||
test_generator_two_column(numpy_6)
|
||||
test_generator_two_column(numpy_7)
|
||||
test_generator_two_column(numpy_8)
|
||||
test_generator_two_column(numpy_9)
|
||||
test_generator_two_column(numpy_10)
|
||||
tuple_1 = (numpy_7,)
|
||||
test_generator_two_column(tuple_1)
|
||||
|
||||
tuple_2 = (numpy_2, numpy_3)
|
||||
dataset_generator = get_dataset_generator(tuple_2)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False)
|
||||
count = 0
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
assert (data["data"] == numpy_2).all()
|
||||
assert (data["label"] == numpy_3).all()
|
||||
count += 1
|
||||
assert count == 20
|
||||
|
||||
tuple_3 = (numpy_4, numpy_5, numpy_6)
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
dataset_generator = get_dataset_generator(tuple_3)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False)
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data["data"])
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:2 and number of returned NumPy array is:3" in str(info.value)
|
||||
|
||||
# test user define three column
|
||||
def test_generator_three_column(value):
|
||||
number = np.random.randint(1, 4)
|
||||
process_flag = False
|
||||
if number > 1 and number % 2 == 0:
|
||||
process_flag = True
|
||||
dataset_generator = get_dataset_generator(value)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False,
|
||||
num_parallel_workers=number, python_multiprocessing=process_flag)
|
||||
count = 0
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data)
|
||||
count += 1
|
||||
assert count == 20
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:3 and number of returned NumPy array is:1" in str(info.value)
|
||||
|
||||
numpy_1 = np.array(1)
|
||||
numpy_2 = np.array([1])
|
||||
numpy_3 = np.array([1, 2])
|
||||
numpy_4 = np.array([1, 2, 3])
|
||||
numpy_5 = np.array([[1], [2]])
|
||||
numpy_6 = np.array([[1, 2], [2, 3]])
|
||||
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
|
||||
numpy_8 = np.array([[1], [2], [3]])
|
||||
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
|
||||
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
|
||||
test_generator_three_column(numpy_1)
|
||||
test_generator_three_column(numpy_2)
|
||||
test_generator_three_column(numpy_3)
|
||||
test_generator_three_column(numpy_4)
|
||||
test_generator_three_column(numpy_5)
|
||||
test_generator_three_column(numpy_6)
|
||||
test_generator_three_column(numpy_7)
|
||||
test_generator_three_column(numpy_8)
|
||||
test_generator_three_column(numpy_9)
|
||||
test_generator_three_column(numpy_10)
|
||||
tuple_1 = (numpy_7,)
|
||||
test_generator_three_column(tuple_1)
|
||||
|
||||
tuple_2 = (numpy_2, numpy_3)
|
||||
with pytest.raises(RuntimeError) as info:
|
||||
dataset_generator = get_dataset_generator(tuple_2)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False)
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
print(data["data"])
|
||||
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
|
||||
"column_names," in str(info.value)
|
||||
assert "the size of column_names is:3 and number of returned NumPy array is:2" in str(info.value)
|
||||
|
||||
tuple_3 = (numpy_4, numpy_5, numpy_6)
|
||||
dataset_generator = get_dataset_generator(tuple_3)
|
||||
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False)
|
||||
count = 0
|
||||
for data in dataset.create_dict_iterator(output_numpy=True):
|
||||
assert (data["data"] == numpy_4).all()
|
||||
assert (data["label"] == numpy_5).all()
|
||||
assert (data["label2"] == numpy_6).all()
|
||||
count += 1
|
||||
assert count == 20
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_generator_0()
|
||||
test_generator_1()
|
||||
|
@ -1415,3 +2061,6 @@ if __name__ == "__main__":
|
|||
test_generator_single_input_4()
|
||||
test_generator_single_input_5()
|
||||
test_generator_single_input_6()
|
||||
test_generator_with_single_numpy()
|
||||
test_generator_with_single_numpy_with_next()
|
||||
test_generator_with_single_numpy_with_yield()
|
||||
|
|
Loading…
Reference in New Issue