diff --git a/mindspore/ccsrc/minddata/dataset/core/config_manager.cc b/mindspore/ccsrc/minddata/dataset/core/config_manager.cc index 08015e7527b..652d27566ea 100644 --- a/mindspore/ccsrc/minddata/dataset/core/config_manager.cc +++ b/mindspore/ccsrc/minddata/dataset/core/config_manager.cc @@ -35,6 +35,7 @@ ConfigManager::ConfigManager() num_parallel_workers_(kCfgParallelWorkers), worker_connector_size_(kCfgWorkerConnectorSize), op_connector_size_(kCfgOpConnectorSize), + rank_id_(kCfgDefaultRankId), seed_(kCfgDefaultSeed), monitor_sampling_interval_(kCfgMonitorSamplingInterval), callback_timout_(kCfgCallbackTimeout), @@ -128,7 +129,7 @@ void ConfigManager::set_op_connector_size(int32_t connector_size) { op_connector uint32_t ConfigManager::seed() const { return seed_; } -void ConfigManager::set_rank_id(uint32_t rank_id) { rank_id_ = rank_id; } +void ConfigManager::set_rank_id(int32_t rank_id) { rank_id_ = rank_id; } void ConfigManager::set_seed(uint32_t seed) { seed_ = seed; } diff --git a/mindspore/ccsrc/minddata/dataset/core/config_manager.h b/mindspore/ccsrc/minddata/dataset/core/config_manager.h index 380074fbf43..dc51fde54ad 100644 --- a/mindspore/ccsrc/minddata/dataset/core/config_manager.h +++ b/mindspore/ccsrc/minddata/dataset/core/config_manager.h @@ -148,11 +148,11 @@ class ConfigManager { // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES', // but for distribute scenario, this rank_id come from _get_global_rank() in python // @return Get the current device id, for one process, it's only with one rank_id. - uint32_t rank_id() const { return rank_id_; } + int32_t rank_id() const { return rank_id_; } // setter function // @param rank_id - Set the current device id - void set_rank_id(uint32_t rank_id); + void set_rank_id(int32_t rank_id); uint32_t seed() const; @@ -210,7 +210,7 @@ class ConfigManager { // This rank_id is for numa and device_queue, one process work with only one rank_id, // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES', // but for distribute scenario, this rank_id come from _get_global_rank() in python - uint32_t rank_id_; + int32_t rank_id_; uint32_t seed_; uint32_t monitor_sampling_interval_; uint32_t callback_timout_; diff --git a/mindspore/ccsrc/minddata/dataset/core/constants.h b/mindspore/ccsrc/minddata/dataset/core/constants.h index bc92fe15447..0e0f485ba04 100644 --- a/mindspore/ccsrc/minddata/dataset/core/constants.h +++ b/mindspore/ccsrc/minddata/dataset/core/constants.h @@ -84,6 +84,7 @@ constexpr uint32_t kCfgRowsPerBuffer = 1; constexpr uint32_t kCfgParallelWorkers = 4; constexpr uint32_t kCfgWorkerConnectorSize = 16; constexpr uint32_t kCfgOpConnectorSize = 16; +constexpr int32_t kCfgDefaultRankId = -1; constexpr uint32_t kCfgDefaultSeed = std::mt19937::default_seed; constexpr uint32_t kCfgMonitorSamplingInterval = 10; constexpr uint32_t kCfgCallbackTimeout = 60; // timeout value for callback in seconds diff --git a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc index 5e0ce6b2a41..f39ec7cf666 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc @@ -18,7 +18,7 @@ #include #include #include -#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE) +#if defined(NUMA_ENABLED) && (defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)) #include #endif #include "minddata/dataset/engine/datasetops/dataset_op.h" @@ -46,7 +46,7 @@ ExecutionTree::ExecutionTree() : id_count_(0), pre_pass_override_(nullptr) { prepare_flags_ = kDePrepNone; profiling_manager_ = std::make_unique(this); optimize_ = common::GetEnv("OPTIMIZE") == "true" ? true : false; -#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE) +#if defined(NUMA_ENABLED) && (defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)) std::shared_ptr cfg = GlobalContext::config_manager(); rank_id_ = cfg->rank_id(); #endif @@ -145,7 +145,7 @@ Status ExecutionTree::Launch() { // opencv limit too many threads #ifndef ENABLE_ANDROID #if !defined(_WIN32) && !defined(_WIN64) && !defined(__APPLE__) -#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE) +#if defined(NUMA_ENABLED) && (defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)) // Here we do numa bind for performance optimization, as our test result, // if we do numa bind when get_dataset_size launch a tree, we'll get a // better performance than only we do numa bind at the time _To_Device @@ -155,7 +155,10 @@ Status ExecutionTree::Launch() { // Now we only test pass in GPU scenario, we've not tested D scenario, // without enough test we don't suggest numa feature open in D scenario int numa_node_max_id = numa_max_node(); - if (numa_node_max_id >= 0 && rank_id_ >= 0) { + if (numa_node_max_id < 0) { + RETURN_STATUS_UNEXPECTED("Get numa max node failed."); + } + if (rank_id_ >= 0) { uint32_t numa_bind_id = static_cast(rank_id_ % (numa_node_max_id + 1)); auto bm = numa_allocate_nodemask(); numa_bitmask_clearall(bm); @@ -163,7 +166,7 @@ Status ExecutionTree::Launch() { numa_bind(bm); numa_bitmask_free(bm); } else { - RETURN_STATUS_UNEXPECTED("Get numa max node failed."); + MS_LOG(INFO) << "Numa bind feature doesn't work now."; } #endif int32_t thread_num = get_nprocs(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h index 8afea65e7d9..adf0dc98bec 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h +++ b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h @@ -282,11 +282,11 @@ class ExecutionTree { bool optimize_; // Flag to enable optional optimizations std::function pre_pass_override_; // function ptr that overrides pre pass, called in PrePrepare() bool partially_prepare_; // Temp: during migration to IR, if true, run remaining passes. -#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE) +#if defined(NUMA_ENABLED) && (defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)) // This rank_id is for numa and device_queue, one process work with only one rank_id, // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES', // but for distribute scenario, this rank_id come from _get_global_rank() in python - uint32_t rank_id_; + int32_t rank_id_; #endif }; } // namespace dataset diff --git a/mindspore/dataset/core/config.py b/mindspore/dataset/core/config.py index 5f3d4fdad7c..ac44eb9cd12 100644 --- a/mindspore/dataset/core/config.py +++ b/mindspore/dataset/core/config.py @@ -41,7 +41,7 @@ def _init_device_info(): """ from mindspore import context from mindspore.parallel._auto_parallel_context import auto_parallel_context - from mindspore.parallel._utils import _get_global_rank + from mindspore.parallel._utils import _get_global_rank, _get_device_num if context.get_context("device_target") == "GPU": rank_id = _get_global_rank() parallel_mode = auto_parallel_context().get_parallel_mode() @@ -52,6 +52,12 @@ def _init_device_info(): if cuda_id != rank_id: rank_id = cuda_id _config.set_rank_id(rank_id) + elif context.get_context("device_target") == "Ascend": + rank_id = _get_global_rank() + device_num = _get_device_num() + # Ascend only support multi-process scenario + if device_num > 1: + _config.set_rank_id(rank_id) def set_seed(seed): diff --git a/model_zoo/official/cv/alexnet/src/dataset.py b/model_zoo/official/cv/alexnet/src/dataset.py index b43f2a553c6..df5df96505a 100644 --- a/model_zoo/official/cv/alexnet/src/dataset.py +++ b/model_zoo/official/cv/alexnet/src/dataset.py @@ -85,9 +85,11 @@ def create_dataset_imagenet(dataset_path, batch_size=32, repeat_num=1, training= device_num, rank_id = _get_rank_info() cfg = alexnet_imagenet_cfg - if num_parallel_workers is None: - num_parallel_workers = int(64 / device_num) - data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=num_parallel_workers, + num_parallel_workers = 16 + if device_num == 1: + num_parallel_workers = 48 + ds.config.set_prefetch_size(8) + data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=4, shuffle=shuffle, sampler=sampler, class_indexing=class_indexing, num_shards=device_num, shard_id=rank_id) @@ -113,18 +115,14 @@ def create_dataset_imagenet(dataset_path, batch_size=32, repeat_num=1, training= CV.HWC2CHW() ] - transform_label = [C.TypeCast(mstype.int32)] - data_set = data_set.map(input_columns="image", num_parallel_workers=num_parallel_workers, operations=transform_img) - data_set = data_set.map(input_columns="label", num_parallel_workers=num_parallel_workers, - operations=transform_label) - num_parallel_workers2 = int(16 / device_num) - data_set = data_set.batch(batch_size, num_parallel_workers=num_parallel_workers2, drop_remainder=True) + data_set = data_set.batch(batch_size, drop_remainder=True) # apply dataset repeat operation - data_set = data_set.repeat(repeat_num) + if repeat_num > 1: + data_set = data_set.repeat(repeat_num) return data_set