!18122 MindData fix insufficient shm problem and pool is not running problem

Merge pull request !18122 from xiefangqi/md_fix_shm_problem
This commit is contained in:
i-robot 2021-06-11 16:39:49 +08:00 committed by Gitee
commit 6f01230ce2
2 changed files with 16 additions and 4 deletions

View File

@ -2456,7 +2456,10 @@ class _PythonCallable:
# This call will send the tensors along with Python callable index to the process pool.
# Block, yield GIL. Current thread will reacquire GIL once result is returned.
result = self.pool.apply_async(_pyfunc_worker_exec, [self.idx, qid, []])
if self._pool_is_running() and check_iterator_cleanup() is False:
result = self.pool.apply_async(_pyfunc_worker_exec, [self.idx, qid, []])
else:
return self.py_callable(*args)
else:
result = self.pool.apply_async(_pyfunc_worker_exec, [self.idx, -1, *args])
@ -3532,7 +3535,11 @@ class SamplerFn:
self.pid = []
# Event for end of epoch
if multi_process is True:
self.eof = multiprocessing.Event()
try:
self.eof = multiprocessing.Event()
except:
raise RuntimeError("Init multiprocessing.Event() failed, This might be caused by insufficient shm,"
+ " and the recommended shm size is at least 5 GB.")
else:
self.eof = threading.Event()
# Create workers
@ -3544,7 +3551,11 @@ class SamplerFn:
for _ in range(num_worker):
if multi_process is True:
worker = _GeneratorWorkerMp(dataset, self.eof, max_rowsize, queue_size)
try:
worker = _GeneratorWorkerMp(dataset, self.eof, max_rowsize, queue_size)
except:
raise RuntimeError("Init multiprocessing.Queue() failed, This might be caused by insufficient shm,"
+ " and the recommended shm size is at least 5 GB.")
worker.daemon = True
# When multi processes fork a subprocess, the lock of the main process is copied to the subprocess,
# which may cause deadlock. Therefore, the subprocess startup is performed in che initialization phase.
@ -3928,7 +3939,7 @@ class GeneratorDataset(MappableDataset):
new_op.prepared_source = (lambda sample_ids: _cpp_sampler_fn(sample_ids, self.source))
new_op.sample_fn = sample_fn
except RuntimeError as e:
raise Exception("Init failed during deepcopy, reason is", e.message)
raise Exception(str(e))
else:
try:
new_op.sampler = None

View File

@ -64,6 +64,7 @@ class _SharedQueue(multiprocessing.queues.Queue):
+ "bytes, "
+ str(self.num_seg)
+ " elements."
+ " This might be caused by insufficient shm, and the recommended shm size is at least 5 GB."
)
def put(self, data, timeout=None):