forked from mindspore-Ecosystem/mindspore
minddata ascend add numa bind
This commit is contained in:
parent
1150ae3376
commit
c35782dd86
|
@ -35,6 +35,7 @@ ConfigManager::ConfigManager()
|
|||
num_parallel_workers_(kCfgParallelWorkers),
|
||||
worker_connector_size_(kCfgWorkerConnectorSize),
|
||||
op_connector_size_(kCfgOpConnectorSize),
|
||||
rank_id_(kCfgDefaultRankId),
|
||||
seed_(kCfgDefaultSeed),
|
||||
monitor_sampling_interval_(kCfgMonitorSamplingInterval),
|
||||
callback_timout_(kCfgCallbackTimeout),
|
||||
|
@ -128,7 +129,7 @@ void ConfigManager::set_op_connector_size(int32_t connector_size) { op_connector
|
|||
|
||||
uint32_t ConfigManager::seed() const { return seed_; }
|
||||
|
||||
void ConfigManager::set_rank_id(uint32_t rank_id) { rank_id_ = rank_id; }
|
||||
void ConfigManager::set_rank_id(int32_t rank_id) { rank_id_ = rank_id; }
|
||||
|
||||
void ConfigManager::set_seed(uint32_t seed) { seed_ = seed; }
|
||||
|
||||
|
|
|
@ -148,11 +148,11 @@ class ConfigManager {
|
|||
// for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
|
||||
// but for distribute scenario, this rank_id come from _get_global_rank() in python
|
||||
// @return Get the current device id, for one process, it's only with one rank_id.
|
||||
uint32_t rank_id() const { return rank_id_; }
|
||||
int32_t rank_id() const { return rank_id_; }
|
||||
|
||||
// setter function
|
||||
// @param rank_id - Set the current device id
|
||||
void set_rank_id(uint32_t rank_id);
|
||||
void set_rank_id(int32_t rank_id);
|
||||
|
||||
uint32_t seed() const;
|
||||
|
||||
|
@ -210,7 +210,7 @@ class ConfigManager {
|
|||
// This rank_id is for numa and device_queue, one process work with only one rank_id,
|
||||
// for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
|
||||
// but for distribute scenario, this rank_id come from _get_global_rank() in python
|
||||
uint32_t rank_id_;
|
||||
int32_t rank_id_;
|
||||
uint32_t seed_;
|
||||
uint32_t monitor_sampling_interval_;
|
||||
uint32_t callback_timout_;
|
||||
|
|
|
@ -84,6 +84,7 @@ constexpr uint32_t kCfgRowsPerBuffer = 1;
|
|||
constexpr uint32_t kCfgParallelWorkers = 4;
|
||||
constexpr uint32_t kCfgWorkerConnectorSize = 16;
|
||||
constexpr uint32_t kCfgOpConnectorSize = 16;
|
||||
constexpr int32_t kCfgDefaultRankId = -1;
|
||||
constexpr uint32_t kCfgDefaultSeed = std::mt19937::default_seed;
|
||||
constexpr uint32_t kCfgMonitorSamplingInterval = 10;
|
||||
constexpr uint32_t kCfgCallbackTimeout = 60; // timeout value for callback in seconds
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
#include <string>
|
||||
#include <utility>
|
||||
#include <limits>
|
||||
#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE)
|
||||
#if defined(NUMA_ENABLED) && (defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE))
|
||||
#include <numa.h>
|
||||
#endif
|
||||
#include "minddata/dataset/engine/datasetops/dataset_op.h"
|
||||
|
@ -46,7 +46,7 @@ ExecutionTree::ExecutionTree() : id_count_(0), pre_pass_override_(nullptr) {
|
|||
prepare_flags_ = kDePrepNone;
|
||||
profiling_manager_ = std::make_unique<ProfilingManager>(this);
|
||||
optimize_ = common::GetEnv("OPTIMIZE") == "true" ? true : false;
|
||||
#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE)
|
||||
#if defined(NUMA_ENABLED) && (defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE))
|
||||
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
|
||||
rank_id_ = cfg->rank_id();
|
||||
#endif
|
||||
|
@ -145,7 +145,7 @@ Status ExecutionTree::Launch() {
|
|||
// opencv limit too many threads
|
||||
#ifndef ENABLE_ANDROID
|
||||
#if !defined(_WIN32) && !defined(_WIN64) && !defined(__APPLE__)
|
||||
#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE)
|
||||
#if defined(NUMA_ENABLED) && (defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE))
|
||||
// Here we do numa bind for performance optimization, as our test result,
|
||||
// if we do numa bind when get_dataset_size launch a tree, we'll get a
|
||||
// better performance than only we do numa bind at the time _To_Device
|
||||
|
@ -155,7 +155,10 @@ Status ExecutionTree::Launch() {
|
|||
// Now we only test pass in GPU scenario, we've not tested D scenario,
|
||||
// without enough test we don't suggest numa feature open in D scenario
|
||||
int numa_node_max_id = numa_max_node();
|
||||
if (numa_node_max_id >= 0 && rank_id_ >= 0) {
|
||||
if (numa_node_max_id < 0) {
|
||||
RETURN_STATUS_UNEXPECTED("Get numa max node failed.");
|
||||
}
|
||||
if (rank_id_ >= 0) {
|
||||
uint32_t numa_bind_id = static_cast<uint32_t>(rank_id_ % (numa_node_max_id + 1));
|
||||
auto bm = numa_allocate_nodemask();
|
||||
numa_bitmask_clearall(bm);
|
||||
|
@ -163,7 +166,7 @@ Status ExecutionTree::Launch() {
|
|||
numa_bind(bm);
|
||||
numa_bitmask_free(bm);
|
||||
} else {
|
||||
RETURN_STATUS_UNEXPECTED("Get numa max node failed.");
|
||||
MS_LOG(INFO) << "Numa bind feature doesn't work now.";
|
||||
}
|
||||
#endif
|
||||
int32_t thread_num = get_nprocs();
|
||||
|
|
|
@ -282,11 +282,11 @@ class ExecutionTree {
|
|||
bool optimize_; // Flag to enable optional optimizations
|
||||
std::function<OptPass(OptPass)> pre_pass_override_; // function ptr that overrides pre pass, called in PrePrepare()
|
||||
bool partially_prepare_; // Temp: during migration to IR, if true, run remaining passes.
|
||||
#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE)
|
||||
#if defined(NUMA_ENABLED) && (defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE))
|
||||
// This rank_id is for numa and device_queue, one process work with only one rank_id,
|
||||
// for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
|
||||
// but for distribute scenario, this rank_id come from _get_global_rank() in python
|
||||
uint32_t rank_id_;
|
||||
int32_t rank_id_;
|
||||
#endif
|
||||
};
|
||||
} // namespace dataset
|
||||
|
|
|
@ -41,7 +41,7 @@ def _init_device_info():
|
|||
"""
|
||||
from mindspore import context
|
||||
from mindspore.parallel._auto_parallel_context import auto_parallel_context
|
||||
from mindspore.parallel._utils import _get_global_rank
|
||||
from mindspore.parallel._utils import _get_global_rank, _get_device_num
|
||||
if context.get_context("device_target") == "GPU":
|
||||
rank_id = _get_global_rank()
|
||||
parallel_mode = auto_parallel_context().get_parallel_mode()
|
||||
|
@ -52,6 +52,12 @@ def _init_device_info():
|
|||
if cuda_id != rank_id:
|
||||
rank_id = cuda_id
|
||||
_config.set_rank_id(rank_id)
|
||||
elif context.get_context("device_target") == "Ascend":
|
||||
rank_id = _get_global_rank()
|
||||
device_num = _get_device_num()
|
||||
# Ascend only support multi-process scenario
|
||||
if device_num > 1:
|
||||
_config.set_rank_id(rank_id)
|
||||
|
||||
|
||||
def set_seed(seed):
|
||||
|
|
|
@ -85,9 +85,11 @@ def create_dataset_imagenet(dataset_path, batch_size=32, repeat_num=1, training=
|
|||
device_num, rank_id = _get_rank_info()
|
||||
cfg = alexnet_imagenet_cfg
|
||||
|
||||
if num_parallel_workers is None:
|
||||
num_parallel_workers = int(64 / device_num)
|
||||
data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=num_parallel_workers,
|
||||
num_parallel_workers = 16
|
||||
if device_num == 1:
|
||||
num_parallel_workers = 48
|
||||
ds.config.set_prefetch_size(8)
|
||||
data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=4,
|
||||
shuffle=shuffle, sampler=sampler, class_indexing=class_indexing,
|
||||
num_shards=device_num, shard_id=rank_id)
|
||||
|
||||
|
@ -113,18 +115,14 @@ def create_dataset_imagenet(dataset_path, batch_size=32, repeat_num=1, training=
|
|||
CV.HWC2CHW()
|
||||
]
|
||||
|
||||
transform_label = [C.TypeCast(mstype.int32)]
|
||||
|
||||
data_set = data_set.map(input_columns="image", num_parallel_workers=num_parallel_workers,
|
||||
operations=transform_img)
|
||||
data_set = data_set.map(input_columns="label", num_parallel_workers=num_parallel_workers,
|
||||
operations=transform_label)
|
||||
|
||||
num_parallel_workers2 = int(16 / device_num)
|
||||
data_set = data_set.batch(batch_size, num_parallel_workers=num_parallel_workers2, drop_remainder=True)
|
||||
data_set = data_set.batch(batch_size, drop_remainder=True)
|
||||
|
||||
# apply dataset repeat operation
|
||||
data_set = data_set.repeat(repeat_num)
|
||||
if repeat_num > 1:
|
||||
data_set = data_set.repeat(repeat_num)
|
||||
|
||||
return data_set
|
||||
|
||||
|
|
Loading…
Reference in New Issue