fix attr failed problem

This commit is contained in:
xiefangqi 2021-09-10 15:02:03 +08:00
parent 4e3a91f0ae
commit 66c5f937dd
3 changed files with 20 additions and 13 deletions

View File

@ -17,6 +17,7 @@ The configuration module provides various functions to set and get the supported
configuration parameters, and read a configuration file.
"""
import os
import platform
import random
import time
import numpy
@ -428,6 +429,10 @@ def get_enable_shared_mem():
>>> # Get the flag of shared memory feature.
>>> shared_mem_flag = ds.config.get_enable_shared_mem()
"""
# For windows we forbid shared mem function temporarily
if platform.system().lower() == 'windows':
logger.warning("For windows we forbid shared mem function temporarily.")
return False
return _config.get_enable_shared_mem()

View File

@ -2210,10 +2210,11 @@ class BatchDataset(Dataset):
# If Python version greater than 3.8, we need to close ThreadPool in atexit for unclean pool teardown.
if sys.version_info >= (3, 8):
atexit.register(self.process_pool.close)
self.eot = threading.Event()
self.watch_dog = threading.Thread(target=_watch_dog, args=(self.eot, self.pids))
self.watch_dog.daemon = True
self.watch_dog.start()
if platform.system().lower() != 'windows':
self.eot = threading.Event()
self.watch_dog = threading.Thread(target=_watch_dog, args=(self.eot, self.pids))
self.watch_dog.daemon = True
self.watch_dog.start()
else:
if self.per_batch_map is not None:
self.per_batch_map = FuncWrapper(self.per_batch_map)
@ -2225,7 +2226,7 @@ class BatchDataset(Dataset):
def __del__(self):
if hasattr(self, 'process_pool') and self.process_pool is not None:
self.process_pool.close()
if self.watch_dog is not None and self.eot is not None:
if hasattr(self, 'watch_dog') and self.watch_dog is not None and hasattr(self, 'eot') and self.eot is not None:
self._abort_watchdog()
@ -2750,10 +2751,11 @@ class MapDataset(Dataset):
# If Python version greater than 3.8, we need to close ThreadPool in atexit for unclean pool teardown.
if sys.version_info >= (3, 8):
atexit.register(self.process_pool.close)
self.eot = threading.Event()
self.watch_dog = threading.Thread(target=_watch_dog, args=(self.eot, self.pids))
self.watch_dog.daemon = True
self.watch_dog.start()
if platform.system().lower() != 'windows':
self.eot = threading.Event()
self.watch_dog = threading.Thread(target=_watch_dog, args=(self.eot, self.pids))
self.watch_dog.daemon = True
self.watch_dog.start()
def _abort_watchdog(self):
if not self.eot.is_set():
@ -2763,7 +2765,7 @@ class MapDataset(Dataset):
if hasattr(self, 'process_pool') and self.process_pool is not None:
self.process_pool.close()
self.process_pool.join()
if self.watch_dog is not None and self.eot is not None:
if hasattr(self, 'watch_dog') and self.watch_dog is not None and hasattr(self, 'eot') and self.eot is not None:
self._abort_watchdog()
@ -3716,7 +3718,7 @@ class SamplerFn:
worker = _GeneratorWorkerMt(dataset, self.eof)
worker.daemon = True
self.workers.append(worker)
if multi_process is True:
if multi_process is True and platform.system().lower() != 'windows':
self.eot = threading.Event()
self.watch_dog = threading.Thread(target=_watch_dog, args=(self.eot, self.pids))
self.watch_dog.daemon = True
@ -3776,7 +3778,7 @@ class SamplerFn:
self._abort_watchdog()
def _abort_watchdog(self):
if not self.eot.is_set():
if hasattr(self, 'eot') and self.eot is not None and not self.eot.is_set():
self.eot.set()
def __del__(self):

View File

@ -260,7 +260,7 @@ def test_case_10():
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
data1 = data1.map(operations=[(lambda x: x * 10)], input_columns="col0",
output_columns="out", num_parallel_workers=4, python_multiprocessing=True)
output_columns="out", num_parallel_workers=4)
data1 = data1.map(operations=[(lambda x: x + x), (lambda x: x + 1), (lambda x: x + 2)], input_columns="out",
output_columns="out", num_parallel_workers=4, python_multiprocessing=True)