From 0212ee3e4ee44050378a39c7229160d9f2fc60b2 Mon Sep 17 00:00:00 2001 From: xiefangqi Date: Thu, 10 Jun 2021 11:37:47 +0800 Subject: [PATCH] fix shm problem and apply_async problem --- mindspore/dataset/engine/datasets.py | 19 +++++++++++++++---- mindspore/dataset/engine/queue.py | 1 + 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/mindspore/dataset/engine/datasets.py b/mindspore/dataset/engine/datasets.py index 63fd79f5698..e300f50364a 100644 --- a/mindspore/dataset/engine/datasets.py +++ b/mindspore/dataset/engine/datasets.py @@ -2449,7 +2449,10 @@ class _PythonCallable: # This call will send the tensors along with Python callable index to the process pool. # Block, yield GIL. Current thread will reacquire GIL once result is returned. - result = self.pool.apply_async(_pyfunc_worker_exec, [self.idx, qid, []]) + if self._pool_is_running() and check_iterator_cleanup() is False: + result = self.pool.apply_async(_pyfunc_worker_exec, [self.idx, qid, []]) + else: + return self.py_callable(*args) else: result = self.pool.apply_async(_pyfunc_worker_exec, [self.idx, -1, *args]) @@ -3525,7 +3528,11 @@ class SamplerFn: self.pid = [] # Event for end of epoch if multi_process is True: - self.eof = multiprocessing.Event() + try: + self.eof = multiprocessing.Event() + except: + raise RuntimeError("Init multiprocessing.Event() failed, This might be caused by insufficient shm," + + " and the recommended shm size is at least 5 GB.") else: self.eof = threading.Event() # Create workers @@ -3537,7 +3544,11 @@ class SamplerFn: for _ in range(num_worker): if multi_process is True: - worker = _GeneratorWorkerMp(dataset, self.eof, max_rowsize, queue_size) + try: + worker = _GeneratorWorkerMp(dataset, self.eof, max_rowsize, queue_size) + except: + raise RuntimeError("Init multiprocessing.Queue() failed, This might be caused by insufficient shm," + + " and the recommended shm size is at least 5 GB.") worker.daemon = True # When multi processes fork a subprocess, the lock of the main process is copied to the subprocess, # which may cause deadlock. Therefore, the subprocess startup is performed in che initialization phase. @@ -3921,7 +3932,7 @@ class GeneratorDataset(MappableDataset): new_op.prepared_source = (lambda sample_ids: _cpp_sampler_fn(sample_ids, self.source)) new_op.sample_fn = sample_fn except RuntimeError as e: - raise Exception("Init failed during deepcopy, reason is", e.message) + raise Exception(str(e)) else: try: new_op.sampler = None diff --git a/mindspore/dataset/engine/queue.py b/mindspore/dataset/engine/queue.py index 8b593e006cd..8df45a1768c 100644 --- a/mindspore/dataset/engine/queue.py +++ b/mindspore/dataset/engine/queue.py @@ -64,6 +64,7 @@ class _SharedQueue(multiprocessing.queues.Queue): + "bytes, " + str(self.num_seg) + " elements." + + " This might be caused by insufficient shm, and the recommended shm size is at least 5 GB." ) def put(self, data, timeout=None):