fix zombie subprocesses problem

This commit is contained in:
xiefangqi 2021-07-08 16:45:08 +08:00
parent 1ac696d044
commit ec929d4cf6
1 changed files with 22 additions and 0 deletions

View File

@ -19,6 +19,7 @@ high performance and parses data precisely. Some of the operations that are
provided to users to preprocess data include shuffle, batch, repeat, map, and zip.
"""
import atexit
import errno
import glob
import json
import math
@ -2151,6 +2152,9 @@ class BatchDataset(Dataset):
arg_q_list = []
res_q_list = []
# Register clean zombie subprocesses signal here
signal.signal(signal.SIGCHLD, wait_child_processes)
# If user didn't specify num_parallel_workers, set it to default
if self.num_parallel_workers is not None:
num_parallel = self.num_parallel_workers
@ -2378,6 +2382,18 @@ class ShuffleDataset(Dataset):
return True
# This wait function is for cleaning zombie subprocesses
def wait_child_processes(signum, frame):
try:
while True:
child_pid, _ = os.waitpid(-1, os.WNOHANG)
if child_pid == 0:
break
except OSError as e:
if e.errno != errno.ECHILD:
raise
# Pyfunc collection for multiprocess pyfunc
# This global variable will only be used within subprocesses
_GLOBAL_PYFUNC_LIST = []
@ -2634,6 +2650,9 @@ class MapDataset(Dataset):
callable_list.append(op)
if callable_list:
# Register clean zombie subprocesses signal here
signal.signal(signal.SIGCHLD, wait_child_processes)
# Construct pool with the callable list
# The callable list and _pyfunc_worker_init are used to pass lambda function in to subprocesses
self.process_pool = multiprocessing.Pool(processes=num_parallel,
@ -3577,6 +3596,9 @@ class SamplerFn:
self.pid = []
# Event for end of epoch
if multi_process is True:
# Register clean zombie subprocesses signal here
signal.signal(signal.SIGCHLD, wait_child_processes)
try:
self.eof = multiprocessing.Event()
except: