!26234 New dataset config option, auto_offload

Merge pull request !26234 from markuskunej/offload_config
This commit is contained in:
i-robot 2021-11-19 03:44:56 +00:00 committed by Gitee
commit f38df5c888
7 changed files with 167 additions and 22 deletions

View File

@ -57,6 +57,8 @@ PYBIND_REGISTER(ConfigManager, 0, ([](const py::module *m) {
.def("set_worker_connector_size", &ConfigManager::set_worker_connector_size)
.def("set_enable_shared_mem", &ConfigManager::set_enable_shared_mem)
.def("get_enable_shared_mem", &ConfigManager::enable_shared_mem)
.def("set_auto_offload", &ConfigManager::set_auto_offload)
.def("get_auto_offload", &ConfigManager::get_auto_offload)
.def("load", [](ConfigManager &c, std::string s) { THROW_IF_ERROR(c.LoadFile(s)); });
}));

View File

@ -50,7 +50,8 @@ ConfigManager::ConfigManager()
num_cpu_threads_(std::thread::hardware_concurrency()),
auto_num_workers_num_shards_(1),
auto_worker_config_(0),
enable_shared_mem_(true) {
enable_shared_mem_(true),
auto_offload_(false) {
num_cpu_threads_ = num_cpu_threads_ > 0 ? num_cpu_threads_ : std::numeric_limits<uint16_t>::max();
num_parallel_workers_ = num_parallel_workers_ < num_cpu_threads_ ? num_parallel_workers_ : num_cpu_threads_;
std::string env_cache_host = common::GetEnv("MS_CACHE_HOST");

View File

@ -222,6 +222,14 @@ class ConfigManager {
// @return - Flag to indicate whether shared memory for multi-processing is enabled
bool enable_shared_mem() { return enable_shared_mem_; }
// setter function
// @param offload - To enable automatic offloading of dataset ops
void set_auto_offload(bool offload) { auto_offload_ = offload; }
// getter function
// @return - Flag to indicate whether automatic offloading is enabled for the dataset
bool get_auto_offload() { return auto_offload_; }
private:
int32_t num_parallel_workers_;
int32_t worker_connector_size_;
@ -244,6 +252,7 @@ class ConfigManager {
int32_t auto_num_workers_num_shards_;
uint8_t auto_worker_config_;
bool enable_shared_mem_;
bool auto_offload_;
// Private helper function that takes a nlohmann json format and populates the settings
// @param j - The json nlohmann json info
Status FromJson(const nlohmann::json &j);

View File

@ -471,3 +471,37 @@ def set_sending_batches(batch_num):
if not isinstance(batch_num, int):
raise TypeError("batch_num must be an int dtype.")
_config.set_sending_batches(batch_num)
def set_auto_offload(offload):
"""
Set the automatic offload flag of the dataset. If set_auto_offload is True,
automatically offload as many dataset operations from the CPU to the Device (GPU or Ascend).
Args:
offload (bool): Whether to use the automatic offload feature.
Raises:
TypeError: If offload is not a boolean data type.
Examples:
>>> # Enable automatic offload feature
>>> ds.config.set_auto_offload(True)
"""
if not isinstance(offload, bool):
raise TypeError("offload must be a bool dtype")
_config.set_auto_offload(offload)
def get_auto_offload():
"""
Get the state of the automatic offload flag (True or False)
Returns:
bool, Whether the automatic offload feature is enabled.
Example:
>>> # Get the global configuration of the automatic offload feature.
>>> auto_offload = ds.config.get_auto_offload()
"""
return _config.get_auto_offload()

View File

@ -70,7 +70,7 @@ from .validators import check_batch, check_shuffle, check_map, check_filter, che
check_sbu_dataset, check_qmnist_dataset, check_emnist_dataset, check_fake_image_dataset, check_places365_dataset, \
check_photo_tour_dataset, check_ag_news_dataset, check_dbpedia_dataset, check_lj_speech_dataset
from ..core.config import get_callback_timeout, _init_device_info, get_enable_shared_mem, get_num_parallel_workers, \
get_prefetch_size
get_prefetch_size, get_auto_offload
from ..core.datatypes import mstype_to_detype, mstypelist_to_detypelist
from ..core.validator_helpers import replace_none
from ..core.py_util_helpers import ExceptionHandler
@ -95,7 +95,7 @@ ShuffleToShuffleMode = {Shuffle.FILES: cde.ShuffleMode.FILES,
def get_offloadable_ops(operations):
"""
Check if operations are supported by offload hardware accelarator.
Check if operations are supported by offload hardware accelerator.
Args:
operations: list of operations.
@ -116,6 +116,72 @@ def get_offloadable_ops(operations):
return is_offloadable
def check_offload_map(operations, output_columns):
"""
Check if operations are supported by offload hardware accelerator. If not, see if list of operations can be split
into two: not offload supported and offload supported
Args:
operations: list of operations.
output_columns: list of names assigned to the columns outputted by the last operation.
Returns:
bool, indicates whether to use offload hardware accelarator.
bool, indicates whether list of map operations can be split.
list, first group of non-offload supported operations.
list, second group of offload supported operations.
"""
offloadable_ops = get_offloadable_ops(operations)
offload = True
can_split = False
offload_ops = []
non_offload_ops = []
invalid_ops = []
for op in offloadable_ops:
if offloadable_ops[op] is not True:
offload = False
invalid_ops.append(op)
if not offload:
logger.warning(("In map(), offload is set to True, but offload is not supported for the following "
"operation(s): {}").format(*invalid_ops))
if output_columns:
# Cannot split (currently), unsure which side of operations would alter the output columns
logger.warning("Since output_columns is specified, the list of operations cannot be split. "
"Unsure which operation(s) alter the columns. Setting offload to False.")
else:
# See if the map operator can be split and then offloaded
size = len(offloadable_ops)
idx = size
split_idx = size
op_names = list(offloadable_ops.keys())
for op_name in reversed(op_names):
if not offloadable_ops[op_name]:
# From reverse order, this op cannot be offloaded, therefore split here.
split_idx = idx
break
idx = idx - 1
if split_idx == size:
# The last op in the list cannot be offloaded, therefore nothing can be offloaded.
# Nothing to split.
logger.warning(("The last operation, {}, is not supported by offload, setting offload"
" to False").format(op_names[split_idx - 1]))
elif split_idx != 0:
# There are at least 1 offloadable ops at the end of the list.
# Split map() after the last non-offloadable op and only offload the second list of operations.
can_split = True
non_offload_ops = operations[:split_idx]
offload_ops = operations[split_idx:]
logger.warning(("The list of operations in map() can be split into two: {}, {}\n"
"The second list of operations will be run with offload=True"
).format(op_names[:split_idx], op_names[split_idx:]))
return offload, can_split, non_offload_ops, offload_ops
def shuffle_to_shuffle_mode(shuffle):
"""
Shuffle Enum to Shuffle Mode
@ -675,7 +741,7 @@ class Dataset:
@check_map
def map(self, operations, input_columns=None, output_columns=None, column_order=None,
num_parallel_workers=None, python_multiprocessing=False, cache=None, callbacks=None,
max_rowsize=16, offload=False):
max_rowsize=16, offload=None):
"""
Apply each operation in operations to this dataset.
@ -717,7 +783,7 @@ class Dataset:
callbacks (DSCallback, list[DSCallback], optional): List of Dataset callbacks to be called (Default=None).
max_rowsize (int, optional): Maximum size of row in MB that is used for shared memory allocation to copy
data between processes. This is only used if python_multiprocessing is set to True (Default=16).
offload (bool, optional): Flag to indicate whether offload is used (Default=False).
offload (bool, optional): Flag to indicate whether offload is used (Default=None).
Returns:
@ -809,9 +875,28 @@ class Dataset:
... output_columns=["mod2", "mod3", "mod5", "mod7"],
... column_order=["mod7", "mod3", "col2"])
"""
can_split = False
non_offload_ops = []
offload_ops = []
if offload is not None:
offload_flag = offload
else:
offload_flag = get_auto_offload()
if offload_flag:
offload_flag, can_split, non_offload_ops, offload_ops = check_offload_map(operations, output_columns)
if can_split:
non_offload_map_ds = MapDataset(self, non_offload_ops, input_columns, output_columns, column_order,
num_parallel_workers, python_multiprocessing, cache, callbacks,
max_rowsize, offload=False)
return MapDataset(non_offload_map_ds, offload_ops, input_columns, output_columns, column_order,
num_parallel_workers, python_multiprocessing, cache, callbacks, max_rowsize,
offload=True)
return MapDataset(self, operations, input_columns, output_columns, column_order, num_parallel_workers,
python_multiprocessing, cache, callbacks, max_rowsize, offload)
python_multiprocessing, cache, callbacks, max_rowsize, offload_flag)
@check_filter
def filter(self, predicate, input_columns=None, num_parallel_workers=None):
@ -2586,8 +2671,8 @@ def _watch_dog(eot, pids):
return
# Sometimes subprocess may be zombie, so in 30s we can wait and do some useful tasks(waitpid).
wait_pid()
## multiprocessing.queue may hang in .get() forever when put() process was killed.
## We have to exit main process otherwise main process will hang.
# multiprocessing.queue may hang in .get() forever when put() process was killed.
# We have to exit main process otherwise main process will hang.
logger.critical("The subprocess of dataset may exit unexpected or be killed, "
"main process will exit.")
os.kill(os.getpid(), signal.SIGTERM)
@ -2829,19 +2914,6 @@ class MapDataset(Dataset):
self.max_rowsize = max_rowsize
self.offload = offload
if self.offload is True:
offloadable_ops = get_offloadable_ops(operations)
cannot_offload = False
invalid_ops = []
for op in offloadable_ops:
if offloadable_ops[op] is not True:
cannot_offload = True
invalid_ops.append(op)
if cannot_offload is True:
logger.warning(("In map(), offload is set to True, but offload is not supported for the following "
"operation(s): {} \nSetting offload to False").format(*invalid_ops))
self.offload = False
def parse(self, children=None):
operations = []
for op in self.operations:

View File

@ -808,7 +808,8 @@ def check_map(method):
type_check(python_multiprocessing, (bool,), "python_multiprocessing")
check_cache_option(cache)
type_check(max_rowsize, (int,), "max_rowsize")
type_check(offload, (bool,), "offload")
if offload is not None:
type_check(offload, (bool,), "offload")
if callbacks is not None:
if isinstance(callbacks, (list, tuple)):

View File

@ -20,6 +20,7 @@ import mindspore.dataset.vision.c_transforms as C
DATA_DIR = "../data/dataset/testPK/data"
def test_offload():
"""
Feature: test map offload flag.
@ -43,5 +44,30 @@ def test_offload():
np.testing.assert_array_equal(img_0, img_1)
def test_auto_offload():
"""
Feature: Test auto_offload config option.
Description: Input is image dataset.
Expectation: Output should same with auto_offload activated and deactivated.
"""
trans = [C.Decode(), C.HWC2CHW()]
# Dataset with config.auto_offload not activated
dataset_auto_disabled = ds.ImageFolderDataset(DATA_DIR)
dataset_auto_disabled = dataset_auto_disabled.map(operations=trans, input_columns="image")
dataset_auto_disabled = dataset_auto_disabled.batch(8, drop_remainder=True)
# Dataset with config.auto_offload activated
ds.config.set_auto_offload(True)
dataset_auto_enabled = ds.ImageFolderDataset(DATA_DIR)
dataset_auto_enabled = dataset_auto_enabled.map(operations=trans, input_columns="image")
dataset_auto_enabled = dataset_auto_enabled.batch(8, drop_remainder=True)
for (img_0, _), (img_1, _) in zip(dataset_auto_disabled.create_tuple_iterator(num_epochs=1, output_numpy=True),
dataset_auto_enabled.create_tuple_iterator(num_epochs=1, output_numpy=True)):
np.testing.assert_array_equal(img_0, img_1)
if __name__ == "__main__":
test_offload()
test_auto_offload()