forked from mindspore-Ecosystem/mindspore
!19712 Fix a problem that if kill -9 dataset subprocesses, these subprocesses will turn to zombie processes
Merge pull request !19712 from xiefangqi/md_fix_zombie_subprocesses
This commit is contained in:
commit
68977b7c47
|
@ -19,6 +19,7 @@ high performance and parses data precisely. Some of the operations that are
|
|||
provided to users to preprocess data include shuffle, batch, repeat, map, and zip.
|
||||
"""
|
||||
import atexit
|
||||
import errno
|
||||
import glob
|
||||
import json
|
||||
import math
|
||||
|
@ -2151,6 +2152,9 @@ class BatchDataset(Dataset):
|
|||
arg_q_list = []
|
||||
res_q_list = []
|
||||
|
||||
# Register clean zombie subprocesses signal here
|
||||
signal.signal(signal.SIGCHLD, wait_child_processes)
|
||||
|
||||
# If user didn't specify num_parallel_workers, set it to default
|
||||
if self.num_parallel_workers is not None:
|
||||
num_parallel = self.num_parallel_workers
|
||||
|
@ -2378,6 +2382,18 @@ class ShuffleDataset(Dataset):
|
|||
return True
|
||||
|
||||
|
||||
# This wait function is for cleaning zombie subprocesses
|
||||
def wait_child_processes(signum, frame):
|
||||
try:
|
||||
while True:
|
||||
child_pid, _ = os.waitpid(-1, os.WNOHANG)
|
||||
if child_pid == 0:
|
||||
break
|
||||
except OSError as e:
|
||||
if e.errno != errno.ECHILD:
|
||||
raise
|
||||
|
||||
|
||||
# Pyfunc collection for multiprocess pyfunc
|
||||
# This global variable will only be used within subprocesses
|
||||
_GLOBAL_PYFUNC_LIST = []
|
||||
|
@ -2634,6 +2650,9 @@ class MapDataset(Dataset):
|
|||
callable_list.append(op)
|
||||
|
||||
if callable_list:
|
||||
# Register clean zombie subprocesses signal here
|
||||
signal.signal(signal.SIGCHLD, wait_child_processes)
|
||||
|
||||
# Construct pool with the callable list
|
||||
# The callable list and _pyfunc_worker_init are used to pass lambda function in to subprocesses
|
||||
self.process_pool = multiprocessing.Pool(processes=num_parallel,
|
||||
|
@ -3577,6 +3596,9 @@ class SamplerFn:
|
|||
self.pid = []
|
||||
# Event for end of epoch
|
||||
if multi_process is True:
|
||||
# Register clean zombie subprocesses signal here
|
||||
signal.signal(signal.SIGCHLD, wait_child_processes)
|
||||
|
||||
try:
|
||||
self.eof = multiprocessing.Event()
|
||||
except:
|
||||
|
|
Loading…
Reference in New Issue