!23205 Fix bus error with multiprocessing enabled, large image, shared_memory, single device

Merge pull request !23205 from RobinGrosman/mem_bus_failure
This commit is contained in:
i-robot 2021-09-24 19:16:12 +00:00 committed by Gitee
commit d0023355a3
6 changed files with 71 additions and 13 deletions

View File

@ -428,7 +428,8 @@ class Dataset:
@check_batch
def batch(self, batch_size, drop_remainder=False, num_parallel_workers=None, per_batch_map=None,
input_columns=None, output_columns=None, column_order=None, pad_info=None, python_multiprocessing=False):
input_columns=None, output_columns=None, column_order=None, pad_info=None,
python_multiprocessing=False, max_rowsize=16):
"""
Combine batch_size number of consecutive rows into batches.
@ -470,6 +471,8 @@ class Dataset:
(default=None).
python_multiprocessing (bool, optional): Parallelize Python function per_batch_map with multi-processing.
This option could be beneficial if the function is computational heavy (default=False).
max_rowsize(int, optional): Maximum size of row in MB that is used for shared memory allocation to copy
data between processes. This is only used if python_multiprocessing is set to True (default=16).
Returns:
BatchDataset, dataset batched.
@ -492,7 +495,7 @@ class Dataset:
>>> dataset = dataset.batch(batch_size=8, input_columns=["image"], per_batch_map=np_resize)
"""
return BatchDataset(self, batch_size, drop_remainder, num_parallel_workers, per_batch_map, input_columns,
output_columns, column_order, pad_info, python_multiprocessing)
output_columns, column_order, pad_info, python_multiprocessing, max_rowsize)
@check_sync_wait
def sync_wait(self, condition_name, num_batch=1, callback=None):
@ -626,7 +629,7 @@ class Dataset:
@check_map
def map(self, operations, input_columns=None, output_columns=None, column_order=None,
num_parallel_workers=None, python_multiprocessing=False, cache=None, callbacks=None):
num_parallel_workers=None, python_multiprocessing=False, cache=None, callbacks=None, max_rowsize=16):
"""
Apply each operation in operations to this dataset.
@ -666,6 +669,8 @@ class Dataset:
cache (DatasetCache, optional): Use tensor caching service to speed up dataset processing.
(default=None, which means no cache is used).
callbacks (DSCallback, list[DSCallback], optional): List of Dataset callbacks to be called (Default=None).
max_rowsize(int, optional): Maximum size of row in MB that is used for shared memory allocation to copy
data between processes. This is only used if python_multiprocessing is set to True (default=16).
Returns:
@ -759,7 +764,7 @@ class Dataset:
"""
return MapDataset(self, operations, input_columns, output_columns, column_order, num_parallel_workers,
python_multiprocessing, cache, callbacks)
python_multiprocessing, cache, callbacks, max_rowsize)
@check_filter
def filter(self, predicate, input_columns=None, num_parallel_workers=None):
@ -2085,7 +2090,7 @@ class BatchDataset(Dataset):
pad_info (dict, optional): Whether to perform padding on selected columns. pad_info={"col1":([224,224],0)}
will pad column with name "col1" to a tensor of size [224,224] and fill the missing with 0.
max_rowsize(int, optional): Maximum size of row in MB that is used for shared memory allocation to copy
data between processes. This is only used if python_multiprocessing is set to True (default 16 MB).
data between processes. This is only used if python_multiprocessing is set to True (default=16).
"""
@ -2638,7 +2643,7 @@ class MapDataset(Dataset):
(default=None, which means no cache is used).
callbacks (DSCallback, list[DSCallback], optional): List of Dataset callbacks to be called (Default=None)
max_rowsize(int, optional): Maximum size of row in MB that is used for shared memory allocation to copy
data between processes. This is only used if python_multiprocessing is set to True (default 16 MB).
data between processes. This is only used if python_multiprocessing is set to True (default=16).
Raises:
ValueError: If len(input_columns) != len(output_columns) and column_order is not specified.
@ -3646,7 +3651,7 @@ def _check_shm_usage(num_worker, queue_size, max_rowsize, num_queues=1):
when training in parallel mode.
"""
threshold_ratio = 0.8
if platform.system() != "Windows" and _get_device_num() > 1:
if platform.system() != "Windows":
shm_estimate_usage = _get_device_num() * num_worker * num_queues * \
(queue_size + 2) * max_rowsize * 1024 * 1024
try:

View File

@ -597,8 +597,9 @@ def check_batch(method):
@wraps(method)
def new_method(self, *args, **kwargs):
[batch_size, drop_remainder, num_parallel_workers, per_batch_map, input_columns, output_columns,
column_order, pad_info, python_multiprocessing], param_dict = parse_user_args(method, *args, **kwargs)
[batch_size, drop_remainder, num_parallel_workers, per_batch_map,
input_columns, output_columns, column_order, pad_info,
python_multiprocessing, max_rowsize], param_dict = parse_user_args(method, *args, **kwargs)
if not (isinstance(batch_size, int) or (callable(batch_size))):
raise TypeError("batch_size should either be an int or a callable.")
@ -613,6 +614,7 @@ def check_batch(method):
if num_parallel_workers is not None:
check_num_parallel_workers(num_parallel_workers)
type_check(drop_remainder, (bool,), "drop_remainder")
type_check(max_rowsize, (int,), "max_rowsize")
if (pad_info is not None) and (per_batch_map is not None):
raise ValueError("pad_info and per_batch_map can't both be set.")
@ -683,7 +685,7 @@ def check_map(method):
def new_method(self, *args, **kwargs):
from mindspore.dataset.callback import DSCallback
[_, input_columns, output_columns, column_order, num_parallel_workers, python_multiprocessing, cache,
callbacks], _ = \
callbacks, max_rowsize], _ = \
parse_user_args(method, *args, **kwargs)
nreq_param_columns = ['input_columns', 'output_columns', 'column_order']
@ -694,6 +696,7 @@ def check_map(method):
check_num_parallel_workers(num_parallel_workers)
type_check(python_multiprocessing, (bool,), "python_multiprocessing")
check_cache_option(cache)
type_check(max_rowsize, (int,), "max_rowsize")
if callbacks is not None:
if isinstance(callbacks, (list, tuple)):

View File

@ -319,8 +319,13 @@ def test_deterministic_python_seed_multi_thread():
# Save original configuration values
num_parallel_workers_original = ds.config.get_num_parallel_workers()
seed_original = ds.config.get_seed()
mem_original = ds.config.get_enable_shared_mem()
ds.config.set_num_parallel_workers(3)
ds.config.set_seed(0)
# Disable shared memory to save shm in CI
ds.config.set_enable_shared_mem(False)
# when we set the seed all operations within our dataset should be deterministic
# First dataset
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
@ -357,6 +362,7 @@ def test_deterministic_python_seed_multi_thread():
# Restore original configuration values
ds.config.set_num_parallel_workers(num_parallel_workers_original)
ds.config.set_seed(seed_original)
ds.config.set_enable_shared_mem(mem_original)
def test_auto_num_workers_error():

View File

@ -425,8 +425,13 @@ def test_generator_14():
# and cause core dump and blocking in this UT. Add cleanup() here to fix it.
it._cleanup() # pylint: disable=W0212
# Reduce memory needed by reducing queue size
prefetch_original = ds.config.get_prefetch_size()
ds.config.set_prefetch_size(1)
source = [(np.array([x]),) for x in range(256)]
ds1 = ds.GeneratorDataset(source, ["data"], sampler=ds.SequentialSampler(), num_parallel_workers=4).repeat(2)
ds1 = ds.GeneratorDataset(source, ["data"], sampler=ds.SequentialSampler(),
num_parallel_workers=4, max_rowsize=1).repeat(2)
i = 0
for data in ds1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary
golden = np.array([i])
@ -435,6 +440,7 @@ def test_generator_14():
if i == 256:
i = 0
ds.config.set_prefetch_size(prefetch_original)
def test_generator_15():
"""
@ -442,9 +448,14 @@ def test_generator_15():
"""
logger.info("Test 1D Generator MP : 0 - 63")
## Reduce memory needed by reducing queue size
prefetch_original = ds.config.get_prefetch_size()
ds.config.set_prefetch_size(1)
sampler = [x for x in range(256)]
source = [(np.array([x]),) for x in range(256)]
ds1 = ds.GeneratorDataset(source, ["data"], sampler=sampler, num_parallel_workers=4).repeat(2)
ds1 = ds.GeneratorDataset(source, ["data"], sampler=sampler,
num_parallel_workers=4, max_rowsize=1).repeat(1)
i = 0
for data in ds1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary
golden = np.array([i])
@ -453,6 +464,7 @@ def test_generator_15():
if i == 256:
i = 0
ds.config.set_prefetch_size(prefetch_original)
def test_generator_16():
"""
@ -499,6 +511,10 @@ def test_generator_18():
"""
logger.info("Test map column order when input_columns is None.")
# Reduce shm usage by disabling this optimization
mem_original = ds.config.get_enable_shared_mem()
ds.config.set_enable_shared_mem(False)
# apply dataset operations
data1 = ds.GeneratorDataset(generator_mc(2048), ["col0", "col1"], python_multiprocessing=True)
data1 = data1.map(operations=(lambda x: (x * 5)), output_columns=["out0"], num_parallel_workers=2,
@ -520,6 +536,7 @@ def test_generator_18():
golden = np.array([i * 5])
np.testing.assert_array_equal(item["out0"], golden)
ds.config.set_enable_shared_mem(mem_original)
def test_generator_19():
"""

View File

@ -201,6 +201,11 @@ def test_graphdata_generatordataset():
Test generator dataset
"""
logger.info('test generator dataset.\n')
#reduce memory required by disabling the shm optimization
mem_original = ds.config.get_enable_shared_mem()
ds.config.set_enable_shared_mem(False)
g = ds.GraphData(DATASET_FILE)
batch_num = 2
edge_num = g.graph_info()['edge_num'][0]
@ -218,6 +223,7 @@ def test_graphdata_generatordataset():
i += 1
assert i == 40
ds.config.set_enable_shared_mem(mem_original)
def test_graphdata_randomwalkdefault():
"""

View File

@ -188,6 +188,10 @@ def test_case_7():
"""
logger.info("Test 1-1 PyFunc Multiprocess: lambda x : x + x")
# Reduce memory required by disabling the shared memory optimization
mem_original = ds.config.get_enable_shared_mem()
ds.config.set_enable_shared_mem(False)
# apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
@ -201,6 +205,7 @@ def test_case_7():
np.testing.assert_array_equal(item["out"], golden)
i = i + 4
ds.config.set_enable_shared_mem(mem_original)
def test_case_8():
"""
@ -208,6 +213,10 @@ def test_case_8():
"""
logger.info("Test Multiprocess n-m PyFunc : lambda x, y : (x , x + 1, x + y)")
# Reduce memory required by disabling the shared memory optimization
mem_original = ds.config.get_enable_shared_mem()
ds.config.set_enable_shared_mem(False)
col = ["col0", "col1"]
# apply dataset operations
@ -229,6 +238,7 @@ def test_case_8():
np.testing.assert_array_equal(item["out2"], golden)
i = i + 4
ds.config.set_enable_shared_mem(mem_original)
def test_case_9():
"""
@ -236,6 +246,10 @@ def test_case_9():
"""
logger.info("Test multiple 1-1 PyFunc Multiprocess: lambda x : x + x")
# Reduce memory required by disabling the shared memory optimization
mem_original = ds.config.get_enable_shared_mem()
ds.config.set_enable_shared_mem(False)
# apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
@ -249,6 +263,7 @@ def test_case_9():
np.testing.assert_array_equal(item["out"], golden)
i = i + 4
ds.config.set_enable_shared_mem(mem_original)
def test_case_10():
"""
@ -256,6 +271,10 @@ def test_case_10():
"""
logger.info("Test multiple map with multiprocess: lambda x : x + x")
# Reduce memory required by disabling the shared memory optimization
mem_original = ds.config.get_enable_shared_mem()
ds.config.set_enable_shared_mem(False)
# apply dataset operations
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
@ -271,6 +290,7 @@ def test_case_10():
np.testing.assert_array_equal(item["out"], golden)
i = i + 4
ds.config.set_enable_shared_mem(mem_original)
def test_pyfunc_implicit_compose():
"""
@ -337,7 +357,8 @@ def test_func_with_yield_manifest_dataset_01():
DATA_FILE = "../data/dataset/testManifestData/test.manifest"
data = ds.ManifestDataset(DATA_FILE)
data = data.map(operations=pass_func, input_columns=["image"], num_parallel_workers=1, python_multiprocessing=True)
data = data.map(operations=pass_func, input_columns=["image"], num_parallel_workers=1, python_multiprocessing=True,
max_rowsize=1)
num_iter = 0
try:
for _ in data.create_dict_iterator(num_epochs=1, output_numpy=True):