mindspore/tests/ut/python/dataset/test_pyfunc.py

298 lines
10 KiB
Python

# Copyright 2019 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import numpy as np
import pytest
import mindspore.dataset as ds
from mindspore import log as logger
DATA_DIR = ["../data/dataset/testPyfuncMap/data.data"]
SCHEMA_DIR = "../data/dataset/testPyfuncMap/schema.json"
COLUMNS = ["col0", "col1", "col2"]
GENERATE_GOLDEN = False
def test_case_0():
"""
Test PyFunc
"""
logger.info("Test 1-1 PyFunc : lambda x : x + x")
# apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
data1 = data1.map(input_columns="col0", output_columns="out", operations=(lambda x: x + x))
i = 0
for item in data1.create_dict_iterator(): # each data is a dictionary
# In this test, the dataset is 2x2 sequential tensors
golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]])
assert np.array_equal(item["out"], golden)
i = i + 4
def test_case_1():
"""
Test PyFunc
"""
logger.info("Test 1-n PyFunc : lambda x : (x , x + x) ")
col = "col0"
# apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
data1 = data1.map(input_columns=col, output_columns=["out0", "out1"], operations=(lambda x: (x, x + x)),
columns_order=["out0", "out1"])
i = 0
for item in data1.create_dict_iterator(): # each data is a dictionary
# In this test, the dataset is 2x2 sequential tensors
golden = np.array([[i, i + 1], [i + 2, i + 3]])
assert np.array_equal(item["out0"], golden)
golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]])
assert np.array_equal(item["out1"], golden)
i = i + 4
def test_case_2():
"""
Test PyFunc
"""
logger.info("Test n-1 PyFunc : lambda x, y : x + y ")
col = ["col0", "col1"]
# apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
data1 = data1.map(input_columns=col, output_columns="out", operations=(lambda x, y: x + y),
columns_order=["out"])
i = 0
for item in data1.create_dict_iterator(): # each data is a dictionary
# In this test, the dataset is 2x2 sequential tensors
golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]])
assert np.array_equal(item["out"], golden)
i = i + 4
def test_case_3():
"""
Test PyFunc
"""
logger.info("Test n-m PyFunc : lambda x, y : (x , x + 1, x + y)")
col = ["col0", "col1"]
# apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
data1 = data1.map(input_columns=col, output_columns=["out0", "out1", "out2"],
operations=(lambda x, y: (x, x + y, x + y + 1)), columns_order=["out0", "out1", "out2"])
i = 0
for item in data1.create_dict_iterator(): # each data is a dictionary
# In this test, the dataset is 2x2 sequential tensors
golden = np.array([[i, i + 1], [i + 2, i + 3]])
assert np.array_equal(item["out0"], golden)
golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]])
assert np.array_equal(item["out1"], golden)
golden = np.array([[i * 2 + 1, (i + 1) * 2 + 1], [(i + 2) * 2 + 1, (i + 3) * 2 + 1]])
assert np.array_equal(item["out2"], golden)
i = i + 4
def test_case_4():
"""
Test PyFunc
"""
logger.info("Test Parallel n-m PyFunc : lambda x, y : (x , x + 1, x + y)")
col = ["col0", "col1"]
# apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
data1 = data1.map(input_columns=col, output_columns=["out0", "out1", "out2"], num_parallel_workers=4,
operations=(lambda x, y: (x, x + y, x + y + 1)), columns_order=["out0", "out1", "out2"])
i = 0
for item in data1.create_dict_iterator(): # each data is a dictionary
# In this test, the dataset is 2x2 sequential tensors
golden = np.array([[i, i + 1], [i + 2, i + 3]])
assert np.array_equal(item["out0"], golden)
golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]])
assert np.array_equal(item["out1"], golden)
golden = np.array([[i * 2 + 1, (i + 1) * 2 + 1], [(i + 2) * 2 + 1, (i + 3) * 2 + 1]])
assert np.array_equal(item["out2"], golden)
i = i + 4
# The execution of this function will acquire GIL
def func_5(x):
return np.ones(x.shape, dtype=x.dtype)
def test_case_5():
"""
Test PyFunc
"""
logger.info("Test 1-1 PyFunc : lambda x: np.ones(x.shape)")
# apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
data1 = data1.map(input_columns="col0", output_columns="out", operations=func_5)
for item in data1.create_dict_iterator(): # each data is a dictionary
# In this test, the dataset is 2x2 sequential tensors
golden = np.array([[1, 1], [1, 1]])
assert np.array_equal(item["out"], golden)
def test_case_6():
"""
Test PyFunc
"""
logger.info("Test PyFunc ComposeOp : (lambda x : x + x), (lambda x : x + x)")
# apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
data1 = data1.map(input_columns="col0", output_columns="out",
operations=[(lambda x: x + x), (lambda x: x + x)])
i = 0
for item in data1.create_dict_iterator(): # each data is a dictionary
# In this test, the dataset is 2x2 sequential tensors
golden = np.array([[i * 4, (i + 1) * 4], [(i + 2) * 4, (i + 3) * 4]])
assert np.array_equal(item["out"], golden)
i = i + 4
def test_case_7():
"""
Test PyFunc
"""
logger.info("Test 1-1 PyFunc Multiprocess: lambda x : x + x")
# apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
data1 = data1.map(input_columns="col0", output_columns="out", operations=(lambda x: x + x),
num_parallel_workers=4, python_multiprocessing=True)
i = 0
for item in data1.create_dict_iterator(): # each data is a dictionary
# In this test, the dataset is 2x2 sequential tensors
golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]])
assert np.array_equal(item["out"], golden)
i = i + 4
def test_case_8():
"""
Test PyFunc
"""
logger.info("Test Multiprocess n-m PyFunc : lambda x, y : (x , x + 1, x + y)")
col = ["col0", "col1"]
# apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
data1 = data1.map(input_columns=col, output_columns=["out0", "out1", "out2"], num_parallel_workers=4,
operations=(lambda x, y: (x, x + y, x + y + 1)), columns_order=["out0", "out1", "out2"],
python_multiprocessing=True)
i = 0
for item in data1.create_dict_iterator(): # each data is a dictionary
# In this test, the dataset is 2x2 sequential tensors
golden = np.array([[i, i + 1], [i + 2, i + 3]])
assert np.array_equal(item["out0"], golden)
golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]])
assert np.array_equal(item["out1"], golden)
golden = np.array([[i * 2 + 1, (i + 1) * 2 + 1], [(i + 2) * 2 + 1, (i + 3) * 2 + 1]])
assert np.array_equal(item["out2"], golden)
i = i + 4
def test_case_9():
"""
Test PyFunc
"""
logger.info("Test multiple 1-1 PyFunc Multiprocess: lambda x : x + x")
# apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
data1 = data1.map(input_columns="col0", output_columns="out", operations=[(lambda x: x + x), (lambda x: x + 1),
(lambda x: x + 2)],
num_parallel_workers=4, python_multiprocessing=True)
i = 0
for item in data1.create_dict_iterator(): # each data is a dictionary
# In this test, the dataset is 2x2 sequential tensors
golden = np.array([[i * 2 + 3, (i + 1) * 2 + 3], [(i + 2) * 2 + 3, (i + 3) * 2 + 3]])
assert np.array_equal(item["out"], golden)
i = i + 4
def test_pyfunc_execption():
logger.info("Test PyFunc Execption Throw: lambda x : raise Execption()")
def pyfunc(x):
raise Exception("Pyfunc Throw")
with pytest.raises(RuntimeError) as info:
# apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
data1 = data1.map(input_columns="col0", output_columns="out", operations=pyfunc,
num_parallel_workers=4)
for _ in data1:
pass
assert "Pyfunc Throw" in str(info.value)
def skip_test_pyfunc_execption_multiprocess():
logger.info("Test Multiprocess PyFunc Execption Throw: lambda x : raise Execption()")
def pyfunc(x):
raise Exception("MP Pyfunc Throw")
with pytest.raises(RuntimeError) as info:
# apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
data1 = data1.map(input_columns="col0", output_columns="out", operations=pyfunc,
num_parallel_workers=4, python_multiprocessing=True)
for _ in data1:
pass
assert "MP Pyfunc Throw" in str(info.value)
if __name__ == "__main__":
test_case_0()
test_case_1()
test_case_2()
test_case_3()
test_case_4()
test_case_5()
test_case_6()
test_case_7()
test_case_8()
test_case_9()
test_pyfunc_execption()
skip_test_pyfunc_execption_multiprocess()