!11139 [MD][BugFix] MindData remove the compile link of _c_dataengine and numa

From: @xiefangqi
Reviewed-by: 
Signed-off-by:
This commit is contained in:
mindspore-ci-bot 2021-01-15 09:12:55 +08:00 committed by Gitee
commit 0e03630503
14 changed files with 259 additions and 68 deletions

View File

@ -35,26 +35,6 @@ if (MS_BUILD_GRPC)
message(STATUS "Cache is enabled")
endif()
if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
# Try to find numa header file and its library
FIND_PATH(NUMA_INCLUDE_DIR numa.h)
MESSAGE("Numa include dir is: ${NUMA_INCLUDE_DIR}")
FIND_LIBRARY(NUMA_LIBRARY NAMES libnuma.so)
MESSAGE("Numa library is: ${NUMA_LIBRARY}")
FIND_PACKAGE_HANDLE_STANDARD_ARGS(NUMA DEFAULT_MSG
NUMA_INCLUDE_DIR
NUMA_LIBRARY
)
if (NUMA_FOUND)
ADD_DEFINITIONS(-DNUMA_ENABLED)
MESSAGE("Numa package found")
else()
MESSAGE(FATAL_ERROR "Numa package not found, try 'sudo yum install numactl-devel' or 'sudo apt-get install libnuma-dev'")
endif()
endif ()
# conde coverage
# option(ENABLE_COVERAGE "Enable code coverage report" OFF)
# if (ENABLE_COVERAGE)
@ -269,9 +249,6 @@ else ()
target_link_libraries(_c_dataengine PRIVATE -ldl ${SECUREC_LIBRARY})
endif ()
target_link_libraries(_c_dataengine PUBLIC mindspore::sentencepiece)
if (NUMA_FOUND)
target_link_libraries(_c_dataengine PUBLIC numa)
endif()
endif()
target_link_libraries(_c_dataengine PUBLIC mindspore::jpeg_turbo mindspore::turbojpeg mindspore::opencv_core mindspore::opencv_imgcodecs

View File

@ -39,6 +39,8 @@ PYBIND_REGISTER(ConfigManager, 0, ([](const py::module *m) {
.def("get_callback_timeout", &ConfigManager::callback_timeout)
.def("get_monitor_sampling_interval", &ConfigManager::monitor_sampling_interval)
.def("get_num_parallel_workers", &ConfigManager::num_parallel_workers)
.def("get_numa_enable", &ConfigManager::numa_enable)
.def("set_numa_enable", &ConfigManager::set_numa_enable)
.def("get_op_connector_size", &ConfigManager::op_connector_size)
.def("get_rows_per_buffer", &ConfigManager::rows_per_buffer)
.def("get_seed", &ConfigManager::seed)

View File

@ -37,6 +37,7 @@ ConfigManager::ConfigManager()
op_connector_size_(kCfgOpConnectorSize),
rank_id_(kCfgDefaultRankId),
seed_(kCfgDefaultSeed),
numa_enable_(false),
monitor_sampling_interval_(kCfgMonitorSamplingInterval),
callback_timout_(kCfgCallbackTimeout),
cache_host_(kCfgDefaultCacheHost),
@ -131,6 +132,8 @@ uint32_t ConfigManager::seed() const { return seed_; }
void ConfigManager::set_rank_id(int32_t rank_id) { rank_id_ = rank_id; }
void ConfigManager::set_numa_enable(bool numa_enable) { numa_enable_ = numa_enable; }
void ConfigManager::set_seed(uint32_t seed) { seed_ = seed; }
void ConfigManager::set_monitor_sampling_interval(uint32_t interval) { monitor_sampling_interval_ = interval; }

View File

@ -143,6 +143,16 @@ class ConfigManager {
/// \param prefetch_size
void set_prefetch_size(int32_t prefetch_size);
/// setter function
/// \param numa_switch
void set_numa_enable(bool numa_enable);
/// getter function
/// Now we want to seperate the numa link to _c_dataengine in the CMakeLists,
/// so we want user to choose whether to open numa switch.
/// @return Get the current numa switch state.
bool numa_enable() const { return numa_enable_; }
// getter function
// This rank_id is for numa and device_queue, one process work with only one rank_id
// for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
@ -217,6 +227,7 @@ class ConfigManager {
std::string cache_host_;
int32_t cache_port_;
int32_t num_connections_;
bool numa_enable_;
int32_t prefetch_size_;
bool auto_num_workers_;
const int32_t num_cpu_threads_;

View File

@ -6,6 +6,26 @@ ms_build_flatbuffers("de_tensor.fbs" ${CMAKE_CURRENT_SOURCE_DIR} generated_engin
file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc")
set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD)
if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
# Try to find numa header file and its library
FIND_PATH(NUMA_INCLUDE_DIR numa.h)
MESSAGE("Numa include dir is: ${NUMA_INCLUDE_DIR}")
FIND_LIBRARY(NUMA_LIBRARY NAMES libnuma.so)
MESSAGE("Numa library is: ${NUMA_LIBRARY}")
FIND_PACKAGE_HANDLE_STANDARD_ARGS(NUMA DEFAULT_MSG
NUMA_INCLUDE_DIR
NUMA_LIBRARY
)
if (NUMA_FOUND)
ADD_DEFINITIONS(-DNUMA_ENABLED)
MESSAGE("Numa package found")
else()
MESSAGE(FATAL_ERROR "Numa package not found, try 'sudo yum install numactl-devel' or 'sudo apt-get install libnuma-dev'")
endif()
endif ()
if (NUMA_FOUND)
ADD_DEFINITIONS(-DCACHE_LOCAL_CLIENT)
endif ()

View File

@ -18,13 +18,9 @@
#include <string>
#include <utility>
#include <limits>
#if defined(NUMA_ENABLED) && (defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE))
#include <numa.h>
#endif
#include "minddata/dataset/engine/datasetops/dataset_op.h"
#include "minddata/dataset/engine/datasetops/shuffle_op.h"
#include "minddata/dataset/engine/datasetops/device_queue_op.h"
#include "minddata/dataset/util/task_manager.h"
#include "minddata/dataset/engine/opt/pass.h"
#include "minddata/dataset/engine/opt/pre/removal_pass.h"
#ifndef ENABLE_ANDROID
@ -36,6 +32,10 @@
#include "minddata/dataset/engine/opt/pre/epoch_injection_pass.h"
#include "minddata/dataset/engine/perf/profiling.h"
#include "minddata/dataset/engine/perf/monitor.h"
#if defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)
#include "minddata/dataset/util/numa_interface.h"
#endif
#include "minddata/dataset/util/task_manager.h"
namespace mindspore {
namespace dataset {
@ -43,19 +43,28 @@ namespace dataset {
ExecutionTree::ExecutionTree() : id_count_(0), tree_state_(kDeTStateInit), prepare_flags_(kDePrepNone) {
tg_ = std::make_unique<TaskGroup>();
profiling_manager_ = std::make_unique<ProfilingManager>(this);
#if defined(NUMA_ENABLED) && (defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE))
#if defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
rank_id_ = cfg->rank_id();
numa_enable_ = cfg->numa_enable();
handle_ = nullptr;
#endif
}
// Destructor
ExecutionTree::~ExecutionTree() {
#ifdef ENABLE_TDTQUE
#if defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)
if (numa_enable_) {
if (handle_ != nullptr) {
ReleaseLibrary(handle_);
}
}
#if defined(ENABLE_TDTQUE)
DeviceQueueOp *op = dynamic_cast<DeviceQueueOp *>(root_.get());
if (op != nullptr) {
op->StopWaiting();
}
#endif
#endif
(void)tg_->ServiceStop();
}
@ -140,30 +149,26 @@ void ExecutionTree::PrintNode(std::ostream &out, const std::shared_ptr<DatasetOp
// Start the execution of the tree
Status ExecutionTree::Launch() {
// opencv limit too many threads
#ifndef ENABLE_ANDROID
#if !defined(_WIN32) && !defined(_WIN64) && !defined(__APPLE__)
#if defined(NUMA_ENABLED) && (defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE))
#if !defined(_WIN32) && !defined(_WIN64) && !defined(__APPLE__) && !defined(ENABLE_ANDROID)
#if defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)
// Here we do numa bind for performance optimization, as our test result,
// if we do numa bind when get_dataset_size launch a tree, we'll get a
// better performance than only we do numa bind at the time _To_Device
// launch a tree. Our numa bind work is a process level bind, bind with
// both cpu and memory and we choose numa_node with a polling logic:
// numa_bind_id = rank_id_ % (numa_max_node() + 1)
// Now we only test pass in GPU scenario, we've not tested D scenario,
// without enough test we don't suggest numa feature open in D scenario
int numa_node_max_id = numa_max_node();
if (numa_node_max_id < 0) {
RETURN_STATUS_UNEXPECTED("Get numa max node failed.");
// Now we only support GPU scenario and the single process scenario of Ascend,
// now we remove the target_link of numa with _c_dataengine, and user can use
// a config api to control whether to open numa feature.
if (numa_enable_ && rank_id_ >= 0) {
if (handle_ == nullptr) {
handle_ = GetNumaAdapterHandle();
if (handle_ == nullptr) {
RETURN_STATUS_UNEXPECTED("Numa package not found.");
}
if (rank_id_ >= 0) {
uint32_t numa_bind_id = static_cast<uint32_t>(rank_id_ % (numa_node_max_id + 1));
auto bm = numa_allocate_nodemask();
numa_bitmask_clearall(bm);
numa_bitmask_setbit(bm, numa_bind_id);
numa_bind(bm);
numa_bitmask_free(bm);
} else {
MS_LOG(INFO) << "Numa bind feature doesn't work now.";
}
RETURN_IF_NOT_OK(NumaBind(handle_, rank_id_));
MS_LOG(INFO) << "Numa bind memory and cpu successful.";
}
#endif
int32_t thread_num = get_nprocs();
@ -176,7 +181,7 @@ Status ExecutionTree::Launch() {
else
cv::setNumThreads(thread_num);
#endif
#endif
// Tree must be built and prepared before it can be launched!
if (tree_state_ != kDeTStateReady) {
std::string err_msg =

View File

@ -257,11 +257,13 @@ class ExecutionTree {
int32_t num_epochs_; // Total number of epochs to run for this tree
std::unique_ptr<ProfilingManager> profiling_manager_; // Profiling manager
bool partially_prepare_; // Temp: during migration to IR, if true, run remaining passes.
#if defined(NUMA_ENABLED) && (defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE))
#if defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)
// This rank_id is for numa and device_queue, one process work with only one rank_id,
// for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
// but for distribute scenario, this rank_id come from _get_global_rank() in python
int32_t rank_id_;
bool numa_enable_;
void *handle_;
#endif
};
} // namespace dataset

View File

@ -1,21 +1,6 @@
file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc")
set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD)
add_library(utils OBJECT
arena.cc
buddy.cc
circular_pool.cc
data_helper.cc
memory_pool.cc
cond_var.cc
intrp_service.cc
task.cc
task_manager.cc
service.cc
services.cc
lock.cc
semaphore.cc
status.cc
slice.cc
path.cc
wait_post.cc
sig_handler.cc)
if (NOT ${CMAKE_SYSTEM_NAME} MATCHES "Linux")
LIST(REMOVE_ITEM _CURRENT_SRC_FILES numa_interface.cc)
endif()
add_library(utils OBJECT ${_CURRENT_SRC_FILES})

View File

@ -0,0 +1,100 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "minddata/dataset/util/numa_interface.h"
#include <dlfcn.h>
namespace mindspore {
namespace dataset {
inline void *LoadLibrary(const char *name) {
auto handle = dlopen(name, RTLD_LAZY | RTLD_LOCAL);
return handle;
}
inline void *GetNumaAdapterFunc(void *handle, const char *name) {
void *func = dlsym(handle, name);
return func;
}
void ReleaseLibrary(void *handle) {
if (handle != nullptr) {
(void)dlclose(handle);
}
}
void *GetNumaAdapterHandle() {
void *handle = LoadLibrary("libnuma.so");
return handle;
}
typedef int (*GetNumaMaxNodeFunc)(void);
typedef struct bitmask *(*NumaAllocateNodemaskFunc)(void);
typedef struct bitmask *(*NumaBitmaskClearallFunc)(struct bitmask *);
typedef struct bitmask *(*NumaBitmaskSetbitFunc)(struct bitmask *, unsigned int);
typedef void (*NumaBindFunc)(struct bitmask *);
typedef void (*NumaBitmaskFreeFunc)(struct bitmask *);
Status NumaBind(void *handle, const int32_t &rank_id) {
if (handle == nullptr) {
RETURN_STATUS_UNEXPECTED("Numa package not found.");
}
auto numa_max_node_func_pointer = GetNumaAdapterFunc(handle, "numa_max_node");
if (numa_max_node_func_pointer == nullptr) {
RETURN_STATUS_UNEXPECTED("Numa api: numa_max_node not found.");
}
auto numa_allocate_nodemask_func_pointer = GetNumaAdapterFunc(handle, "numa_allocate_nodemask");
if (numa_allocate_nodemask_func_pointer == nullptr) {
RETURN_STATUS_UNEXPECTED("Numa api: numa_allocate_nodemask not found.");
}
auto numa_bitmask_clearall_func_pointer = GetNumaAdapterFunc(handle, "numa_bitmask_clearall");
if (numa_bitmask_clearall_func_pointer == nullptr) {
RETURN_STATUS_UNEXPECTED("Numa api: numa_bitmask_clearall not found.");
}
auto numa_bitmask_setbit_func_pointer = GetNumaAdapterFunc(handle, "numa_bitmask_setbit");
if (numa_bitmask_setbit_func_pointer == nullptr) {
RETURN_STATUS_UNEXPECTED("Numa api: numa_bitmask_setbit not found.");
}
auto numa_bind_func_pointer = GetNumaAdapterFunc(handle, "numa_bind");
if (numa_bind_func_pointer == nullptr) {
RETURN_STATUS_UNEXPECTED("Numa api: numa_bind not found.");
}
auto numa_bitmask_free_func_pointer = GetNumaAdapterFunc(handle, "numa_bitmask_free");
if (numa_bitmask_free_func_pointer == nullptr) {
RETURN_STATUS_UNEXPECTED("Numa api: numa_bitmask_free not found.");
}
auto numa_max_node_func = reinterpret_cast<GetNumaMaxNodeFunc>(numa_max_node_func_pointer);
auto numa_allocate_nodemask_func = reinterpret_cast<NumaAllocateNodemaskFunc>(numa_allocate_nodemask_func_pointer);
auto numa_bitmask_clearall_func = reinterpret_cast<NumaBitmaskClearallFunc>(numa_bitmask_clearall_func_pointer);
auto numa_bitmask_setbit_func = reinterpret_cast<NumaBitmaskSetbitFunc>(numa_bitmask_setbit_func_pointer);
auto numa_bind_func = reinterpret_cast<NumaBindFunc>(numa_bind_func_pointer);
auto numa_bitmask_free_func = reinterpret_cast<NumaBitmaskFreeFunc>(numa_bitmask_free_func_pointer);
int numa_node_max_id = numa_max_node_func();
if (numa_node_max_id < 0) {
RETURN_STATUS_UNEXPECTED("Get numa max node failed.");
}
if (rank_id >= 0) {
uint32_t numa_bind_id = static_cast<uint32_t>(rank_id % (numa_node_max_id + 1));
auto bm = numa_allocate_nodemask_func();
numa_bitmask_clearall_func(bm);
numa_bitmask_setbit_func(bm, numa_bind_id);
numa_bind_func(bm);
numa_bitmask_free_func(bm);
} else {
RETURN_STATUS_UNEXPECTED("Value error, rank_id is a negative value.");
}
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -0,0 +1,44 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_NUMA_INTERFACE_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_NUMA_INTERFACE_H_
#include "minddata/dataset/util/status.h"
namespace mindspore {
namespace dataset {
struct bitmask {
uint64_t size;
uint64_t *maskp;
};
// Now we seperate the link from _c_dataengine with numa,
// and we use dlopen("libnuma") instead. This function will
// return a handle which you can do NumaBind and ReleaseLibrary.
void *GetNumaAdapterHandle();
// Totally this function will do:
// 1. Get function pointer of numa api
// 2. Do numa_bind
Status NumaBind(void *handle, const int32_t &rank_id);
// Release the numa handle for avoid memory leak, we should
// not allow handle is nullptr before we use it.
void ReleaseLibrary(void *handle);
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_NUMA_INTERFACE_H_

View File

@ -62,6 +62,10 @@ def _init_device_info():
rank_id = int(env_rank_id.strip())
if rank_size > 1:
_config.set_rank_id(rank_id)
# Now single process under ascend mode doesn't support numa bind for performance consideration.
if _config.get_numa_enable() is True and rank_size == 1:
raise ValueError("single process under Ascend mode doesn't support numa bind for "
"performance consideration.")
def set_seed(seed):
@ -163,6 +167,37 @@ def get_num_parallel_workers():
return _config.get_num_parallel_workers()
def set_numa_enable(numa_enable):
"""
Set the default state of numa enabled.
Args:
numa_enable (bool): Whether to use numa bind feature.
Raises:
TypeError: If numa_enable is not a boolean data type.
Examples:
>>> # Set a new global configuration value for the state of numa enabled.
>>> # Now parallel dataset operators will run with numa bind function
>>> ds.config.set_numa_enable(True)
"""
if not isinstance(numa_enable, bool):
raise TypeError("numa_enable must be a boolean dtype.")
_config.set_numa_enable(numa_enable)
def get_numa_enable():
"""
Get the default state of numa enabled.
This is the DEFAULT numa enabled value used for the all process.
Returns:
boolean, the default state of numa enabled
"""
return _config.get_numa_enable()
def set_monitor_sampling_interval(interval):
"""
Set the default interval (in milliseconds) for monitor sampling.

View File

@ -254,6 +254,10 @@ if (BUILD_MINDDATA STREQUAL "full")
list(REMOVE_ITEM MINDDATA_KERNELS_DATA_SRC_FILES
"${MINDDATA_DIR}/kernels/data/unique_op.cc"
)
list(REMOVE_ITEM MINDDATA_UTIL_SRC_FILES
"${MINDDATA_DIR}/util/numa_interface.cc"
)
include_directories("${CMAKE_BINARY_DIR}/minddata/dataset/engine/cache")
if (BUILD_MINDDATA_EXAMPLE AND (PLATFORM_ARM32 OR PLATFORM_ARM64))

View File

@ -89,6 +89,8 @@ def create_dataset_imagenet(dataset_path, batch_size=32, repeat_num=1, training=
if device_num == 1:
num_parallel_workers = 48
ds.config.set_prefetch_size(8)
else:
ds.config.set_numa_enable(True)
data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=4,
shuffle=shuffle, sampler=sampler, class_indexing=class_indexing,
num_shards=device_num, shard_id=rank_id)

View File

@ -94,6 +94,7 @@ class MyTimeMonitor(Callback):
def create_dataset(dataset_path, do_train, repeat_num=1, batch_size=32, target="GPU", dtype="fp16",
device_num=1):
ds.config.set_numa_enable(True)
if device_num == 1:
data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=4, shuffle=True)
else: