add check for shm available when num_devices=1 to avoid bus errors

-Required modifying tests to reduce memory usage to check doesn't fail during CI
-Also required fixing max_rowsize to be externalized from map+batch() function
This commit is contained in:
RobinGrosman 2021-09-24 10:24:31 -07:00
parent 1e15b00a31
commit c90efc0403
6 changed files with 71 additions and 13 deletions

View File

@ -428,7 +428,8 @@ class Dataset:
@check_batch @check_batch
def batch(self, batch_size, drop_remainder=False, num_parallel_workers=None, per_batch_map=None, def batch(self, batch_size, drop_remainder=False, num_parallel_workers=None, per_batch_map=None,
input_columns=None, output_columns=None, column_order=None, pad_info=None, python_multiprocessing=False): input_columns=None, output_columns=None, column_order=None, pad_info=None,
python_multiprocessing=False, max_rowsize=16):
""" """
Combine batch_size number of consecutive rows into batches. Combine batch_size number of consecutive rows into batches.
@ -470,6 +471,8 @@ class Dataset:
(default=None). (default=None).
python_multiprocessing (bool, optional): Parallelize Python function per_batch_map with multi-processing. python_multiprocessing (bool, optional): Parallelize Python function per_batch_map with multi-processing.
This option could be beneficial if the function is computational heavy (default=False). This option could be beneficial if the function is computational heavy (default=False).
max_rowsize(int, optional): Maximum size of row in MB that is used for shared memory allocation to copy
data between processes. This is only used if python_multiprocessing is set to True (default=16).
Returns: Returns:
BatchDataset, dataset batched. BatchDataset, dataset batched.
@ -492,7 +495,7 @@ class Dataset:
>>> dataset = dataset.batch(batch_size=8, input_columns=["image"], per_batch_map=np_resize) >>> dataset = dataset.batch(batch_size=8, input_columns=["image"], per_batch_map=np_resize)
""" """
return BatchDataset(self, batch_size, drop_remainder, num_parallel_workers, per_batch_map, input_columns, return BatchDataset(self, batch_size, drop_remainder, num_parallel_workers, per_batch_map, input_columns,
output_columns, column_order, pad_info, python_multiprocessing) output_columns, column_order, pad_info, python_multiprocessing, max_rowsize)
@check_sync_wait @check_sync_wait
def sync_wait(self, condition_name, num_batch=1, callback=None): def sync_wait(self, condition_name, num_batch=1, callback=None):
@ -626,7 +629,7 @@ class Dataset:
@check_map @check_map
def map(self, operations, input_columns=None, output_columns=None, column_order=None, def map(self, operations, input_columns=None, output_columns=None, column_order=None,
num_parallel_workers=None, python_multiprocessing=False, cache=None, callbacks=None): num_parallel_workers=None, python_multiprocessing=False, cache=None, callbacks=None, max_rowsize=16):
""" """
Apply each operation in operations to this dataset. Apply each operation in operations to this dataset.
@ -666,6 +669,8 @@ class Dataset:
cache (DatasetCache, optional): Use tensor caching service to speed up dataset processing. cache (DatasetCache, optional): Use tensor caching service to speed up dataset processing.
(default=None, which means no cache is used). (default=None, which means no cache is used).
callbacks (DSCallback, list[DSCallback], optional): List of Dataset callbacks to be called (Default=None). callbacks (DSCallback, list[DSCallback], optional): List of Dataset callbacks to be called (Default=None).
max_rowsize(int, optional): Maximum size of row in MB that is used for shared memory allocation to copy
data between processes. This is only used if python_multiprocessing is set to True (default=16).
Returns: Returns:
@ -759,7 +764,7 @@ class Dataset:
""" """
return MapDataset(self, operations, input_columns, output_columns, column_order, num_parallel_workers, return MapDataset(self, operations, input_columns, output_columns, column_order, num_parallel_workers,
python_multiprocessing, cache, callbacks) python_multiprocessing, cache, callbacks, max_rowsize)
@check_filter @check_filter
def filter(self, predicate, input_columns=None, num_parallel_workers=None): def filter(self, predicate, input_columns=None, num_parallel_workers=None):
@ -2085,7 +2090,7 @@ class BatchDataset(Dataset):
pad_info (dict, optional): Whether to perform padding on selected columns. pad_info={"col1":([224,224],0)} pad_info (dict, optional): Whether to perform padding on selected columns. pad_info={"col1":([224,224],0)}
will pad column with name "col1" to a tensor of size [224,224] and fill the missing with 0. will pad column with name "col1" to a tensor of size [224,224] and fill the missing with 0.
max_rowsize(int, optional): Maximum size of row in MB that is used for shared memory allocation to copy max_rowsize(int, optional): Maximum size of row in MB that is used for shared memory allocation to copy
data between processes. This is only used if python_multiprocessing is set to True (default 16 MB). data between processes. This is only used if python_multiprocessing is set to True (default=16).
""" """
@ -2638,7 +2643,7 @@ class MapDataset(Dataset):
(default=None, which means no cache is used). (default=None, which means no cache is used).
callbacks (DSCallback, list[DSCallback], optional): List of Dataset callbacks to be called (Default=None) callbacks (DSCallback, list[DSCallback], optional): List of Dataset callbacks to be called (Default=None)
max_rowsize(int, optional): Maximum size of row in MB that is used for shared memory allocation to copy max_rowsize(int, optional): Maximum size of row in MB that is used for shared memory allocation to copy
data between processes. This is only used if python_multiprocessing is set to True (default 16 MB). data between processes. This is only used if python_multiprocessing is set to True (default=16).
Raises: Raises:
ValueError: If len(input_columns) != len(output_columns) and column_order is not specified. ValueError: If len(input_columns) != len(output_columns) and column_order is not specified.
@ -3646,7 +3651,7 @@ def _check_shm_usage(num_worker, queue_size, max_rowsize, num_queues=1):
when training in parallel mode. when training in parallel mode.
""" """
threshold_ratio = 0.8 threshold_ratio = 0.8
if platform.system() != "Windows" and _get_device_num() > 1: if platform.system() != "Windows":
shm_estimate_usage = _get_device_num() * num_worker * num_queues * \ shm_estimate_usage = _get_device_num() * num_worker * num_queues * \
(queue_size + 2) * max_rowsize * 1024 * 1024 (queue_size + 2) * max_rowsize * 1024 * 1024
try: try:

View File

@ -597,8 +597,9 @@ def check_batch(method):
@wraps(method) @wraps(method)
def new_method(self, *args, **kwargs): def new_method(self, *args, **kwargs):
[batch_size, drop_remainder, num_parallel_workers, per_batch_map, input_columns, output_columns, [batch_size, drop_remainder, num_parallel_workers, per_batch_map,
column_order, pad_info, python_multiprocessing], param_dict = parse_user_args(method, *args, **kwargs) input_columns, output_columns, column_order, pad_info,
python_multiprocessing, max_rowsize], param_dict = parse_user_args(method, *args, **kwargs)
if not (isinstance(batch_size, int) or (callable(batch_size))): if not (isinstance(batch_size, int) or (callable(batch_size))):
raise TypeError("batch_size should either be an int or a callable.") raise TypeError("batch_size should either be an int or a callable.")
@ -613,6 +614,7 @@ def check_batch(method):
if num_parallel_workers is not None: if num_parallel_workers is not None:
check_num_parallel_workers(num_parallel_workers) check_num_parallel_workers(num_parallel_workers)
type_check(drop_remainder, (bool,), "drop_remainder") type_check(drop_remainder, (bool,), "drop_remainder")
type_check(max_rowsize, (int,), "max_rowsize")
if (pad_info is not None) and (per_batch_map is not None): if (pad_info is not None) and (per_batch_map is not None):
raise ValueError("pad_info and per_batch_map can't both be set.") raise ValueError("pad_info and per_batch_map can't both be set.")
@ -683,7 +685,7 @@ def check_map(method):
def new_method(self, *args, **kwargs): def new_method(self, *args, **kwargs):
from mindspore.dataset.callback import DSCallback from mindspore.dataset.callback import DSCallback
[_, input_columns, output_columns, column_order, num_parallel_workers, python_multiprocessing, cache, [_, input_columns, output_columns, column_order, num_parallel_workers, python_multiprocessing, cache,
callbacks], _ = \ callbacks, max_rowsize], _ = \
parse_user_args(method, *args, **kwargs) parse_user_args(method, *args, **kwargs)
nreq_param_columns = ['input_columns', 'output_columns', 'column_order'] nreq_param_columns = ['input_columns', 'output_columns', 'column_order']
@ -694,6 +696,7 @@ def check_map(method):
check_num_parallel_workers(num_parallel_workers) check_num_parallel_workers(num_parallel_workers)
type_check(python_multiprocessing, (bool,), "python_multiprocessing") type_check(python_multiprocessing, (bool,), "python_multiprocessing")
check_cache_option(cache) check_cache_option(cache)
type_check(max_rowsize, (int,), "max_rowsize")
if callbacks is not None: if callbacks is not None:
if isinstance(callbacks, (list, tuple)): if isinstance(callbacks, (list, tuple)):

View File

@ -319,8 +319,13 @@ def test_deterministic_python_seed_multi_thread():
# Save original configuration values # Save original configuration values
num_parallel_workers_original = ds.config.get_num_parallel_workers() num_parallel_workers_original = ds.config.get_num_parallel_workers()
seed_original = ds.config.get_seed() seed_original = ds.config.get_seed()
mem_original = ds.config.get_enable_shared_mem()
ds.config.set_num_parallel_workers(3) ds.config.set_num_parallel_workers(3)
ds.config.set_seed(0) ds.config.set_seed(0)
# Disable shared memory to save shm in CI
ds.config.set_enable_shared_mem(False)
# when we set the seed all operations within our dataset should be deterministic # when we set the seed all operations within our dataset should be deterministic
# First dataset # First dataset
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
@ -357,6 +362,7 @@ def test_deterministic_python_seed_multi_thread():
# Restore original configuration values # Restore original configuration values
ds.config.set_num_parallel_workers(num_parallel_workers_original) ds.config.set_num_parallel_workers(num_parallel_workers_original)
ds.config.set_seed(seed_original) ds.config.set_seed(seed_original)
ds.config.set_enable_shared_mem(mem_original)
def test_auto_num_workers_error(): def test_auto_num_workers_error():

View File

@ -425,8 +425,13 @@ def test_generator_14():
# and cause core dump and blocking in this UT. Add cleanup() here to fix it. # and cause core dump and blocking in this UT. Add cleanup() here to fix it.
it._cleanup() # pylint: disable=W0212 it._cleanup() # pylint: disable=W0212
# Reduce memory needed by reducing queue size
prefetch_original = ds.config.get_prefetch_size()
ds.config.set_prefetch_size(1)
source = [(np.array([x]),) for x in range(256)] source = [(np.array([x]),) for x in range(256)]
ds1 = ds.GeneratorDataset(source, ["data"], sampler=ds.SequentialSampler(), num_parallel_workers=4).repeat(2) ds1 = ds.GeneratorDataset(source, ["data"], sampler=ds.SequentialSampler(),
num_parallel_workers=4, max_rowsize=1).repeat(2)
i = 0 i = 0
for data in ds1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary for data in ds1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary
golden = np.array([i]) golden = np.array([i])
@ -435,6 +440,7 @@ def test_generator_14():
if i == 256: if i == 256:
i = 0 i = 0
ds.config.set_prefetch_size(prefetch_original)
def test_generator_15(): def test_generator_15():
""" """
@ -442,9 +448,14 @@ def test_generator_15():
""" """
logger.info("Test 1D Generator MP : 0 - 63") logger.info("Test 1D Generator MP : 0 - 63")
## Reduce memory needed by reducing queue size
prefetch_original = ds.config.get_prefetch_size()
ds.config.set_prefetch_size(1)
sampler = [x for x in range(256)] sampler = [x for x in range(256)]
source = [(np.array([x]),) for x in range(256)] source = [(np.array([x]),) for x in range(256)]
ds1 = ds.GeneratorDataset(source, ["data"], sampler=sampler, num_parallel_workers=4).repeat(2) ds1 = ds.GeneratorDataset(source, ["data"], sampler=sampler,
num_parallel_workers=4, max_rowsize=1).repeat(1)
i = 0 i = 0
for data in ds1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary for data in ds1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary
golden = np.array([i]) golden = np.array([i])
@ -453,6 +464,7 @@ def test_generator_15():
if i == 256: if i == 256:
i = 0 i = 0
ds.config.set_prefetch_size(prefetch_original)
def test_generator_16(): def test_generator_16():
""" """
@ -499,6 +511,10 @@ def test_generator_18():
""" """
logger.info("Test map column order when input_columns is None.") logger.info("Test map column order when input_columns is None.")
# Reduce shm usage by disabling this optimization
mem_original = ds.config.get_enable_shared_mem()
ds.config.set_enable_shared_mem(False)
# apply dataset operations # apply dataset operations
data1 = ds.GeneratorDataset(generator_mc(2048), ["col0", "col1"], python_multiprocessing=True) data1 = ds.GeneratorDataset(generator_mc(2048), ["col0", "col1"], python_multiprocessing=True)
data1 = data1.map(operations=(lambda x: (x * 5)), output_columns=["out0"], num_parallel_workers=2, data1 = data1.map(operations=(lambda x: (x * 5)), output_columns=["out0"], num_parallel_workers=2,
@ -520,6 +536,7 @@ def test_generator_18():
golden = np.array([i * 5]) golden = np.array([i * 5])
np.testing.assert_array_equal(item["out0"], golden) np.testing.assert_array_equal(item["out0"], golden)
ds.config.set_enable_shared_mem(mem_original)
def test_generator_19(): def test_generator_19():
""" """

View File

@ -201,6 +201,11 @@ def test_graphdata_generatordataset():
Test generator dataset Test generator dataset
""" """
logger.info('test generator dataset.\n') logger.info('test generator dataset.\n')
#reduce memory required by disabling the shm optimization
mem_original = ds.config.get_enable_shared_mem()
ds.config.set_enable_shared_mem(False)
g = ds.GraphData(DATASET_FILE) g = ds.GraphData(DATASET_FILE)
batch_num = 2 batch_num = 2
edge_num = g.graph_info()['edge_num'][0] edge_num = g.graph_info()['edge_num'][0]
@ -218,6 +223,7 @@ def test_graphdata_generatordataset():
i += 1 i += 1
assert i == 40 assert i == 40
ds.config.set_enable_shared_mem(mem_original)
def test_graphdata_randomwalkdefault(): def test_graphdata_randomwalkdefault():
""" """

View File

@ -188,6 +188,10 @@ def test_case_7():
""" """
logger.info("Test 1-1 PyFunc Multiprocess: lambda x : x + x") logger.info("Test 1-1 PyFunc Multiprocess: lambda x : x + x")
# Reduce memory required by disabling the shared memory optimization
mem_original = ds.config.get_enable_shared_mem()
ds.config.set_enable_shared_mem(False)
# apply dataset operations # apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
@ -201,6 +205,7 @@ def test_case_7():
np.testing.assert_array_equal(item["out"], golden) np.testing.assert_array_equal(item["out"], golden)
i = i + 4 i = i + 4
ds.config.set_enable_shared_mem(mem_original)
def test_case_8(): def test_case_8():
""" """
@ -208,6 +213,10 @@ def test_case_8():
""" """
logger.info("Test Multiprocess n-m PyFunc : lambda x, y : (x , x + 1, x + y)") logger.info("Test Multiprocess n-m PyFunc : lambda x, y : (x , x + 1, x + y)")
# Reduce memory required by disabling the shared memory optimization
mem_original = ds.config.get_enable_shared_mem()
ds.config.set_enable_shared_mem(False)
col = ["col0", "col1"] col = ["col0", "col1"]
# apply dataset operations # apply dataset operations
@ -229,6 +238,7 @@ def test_case_8():
np.testing.assert_array_equal(item["out2"], golden) np.testing.assert_array_equal(item["out2"], golden)
i = i + 4 i = i + 4
ds.config.set_enable_shared_mem(mem_original)
def test_case_9(): def test_case_9():
""" """
@ -236,6 +246,10 @@ def test_case_9():
""" """
logger.info("Test multiple 1-1 PyFunc Multiprocess: lambda x : x + x") logger.info("Test multiple 1-1 PyFunc Multiprocess: lambda x : x + x")
# Reduce memory required by disabling the shared memory optimization
mem_original = ds.config.get_enable_shared_mem()
ds.config.set_enable_shared_mem(False)
# apply dataset operations # apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
@ -249,6 +263,7 @@ def test_case_9():
np.testing.assert_array_equal(item["out"], golden) np.testing.assert_array_equal(item["out"], golden)
i = i + 4 i = i + 4
ds.config.set_enable_shared_mem(mem_original)
def test_case_10(): def test_case_10():
""" """
@ -256,6 +271,10 @@ def test_case_10():
""" """
logger.info("Test multiple map with multiprocess: lambda x : x + x") logger.info("Test multiple map with multiprocess: lambda x : x + x")
# Reduce memory required by disabling the shared memory optimization
mem_original = ds.config.get_enable_shared_mem()
ds.config.set_enable_shared_mem(False)
# apply dataset operations # apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
@ -271,6 +290,7 @@ def test_case_10():
np.testing.assert_array_equal(item["out"], golden) np.testing.assert_array_equal(item["out"], golden)
i = i + 4 i = i + 4
ds.config.set_enable_shared_mem(mem_original)
def test_pyfunc_implicit_compose(): def test_pyfunc_implicit_compose():
""" """
@ -337,7 +357,8 @@ def test_func_with_yield_manifest_dataset_01():
DATA_FILE = "../data/dataset/testManifestData/test.manifest" DATA_FILE = "../data/dataset/testManifestData/test.manifest"
data = ds.ManifestDataset(DATA_FILE) data = ds.ManifestDataset(DATA_FILE)
data = data.map(operations=pass_func, input_columns=["image"], num_parallel_workers=1, python_multiprocessing=True) data = data.map(operations=pass_func, input_columns=["image"], num_parallel_workers=1, python_multiprocessing=True,
max_rowsize=1)
num_iter = 0 num_iter = 0
try: try:
for _ in data.create_dict_iterator(num_epochs=1, output_numpy=True): for _ in data.create_dict_iterator(num_epochs=1, output_numpy=True):