!34704 [ASCEND/GPU] Dynamic data queue to support data sink with dynamic-shape data but without min/max shape

Merge pull request !34704 from wYann/dynamic_queue
This commit is contained in:
i-robot 2022-06-14 05:48:16 +00:00 committed by Gitee
commit d7dab84181
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
58 changed files with 1469 additions and 615 deletions

View File

@ -196,11 +196,6 @@ if(ENABLE_GPU)
RENAME libnccl.so.2 COMPONENT mindspore)
endif()
endif()
install(
TARGETS gpu_queue
DESTINATION ${INSTALL_LIB_DIR}
COMPONENT mindspore
)
install(
TARGETS cuda_ops
DESTINATION ${INSTALL_LIB_DIR}

View File

@ -133,9 +133,6 @@ if(ENABLE_GPU)
list(APPEND CUDA_NVCC_FLAGS -G)
message("CUDA_NVCC_FLAGS" ${CUDA_NVCC_FLAGS})
endif()
list(REMOVE_ITEM GPU_SRC_LIST "plugin/device/gpu/hal/device/blocking_queue.cc"
"plugin/device/gpu/hal/device/gpu_buffer_mgr.cc")
list(REMOVE_ITEM GPU_SRC_LIST "plugin/device/gpu/hal/device/mpi/mpi_initializer.cc"
"plugin/device/gpu/hal/device/distribution/collective_wrapper.cc"
"plugin/device/gpu/hal/device/distribution/mpi_wrapper.cc"
@ -168,7 +165,6 @@ if(ENABLE_GPU)
add_subdirectory(plugin/device/gpu/kernel/cuda_impl/cuda_ops)
endif()
## make protobuf files
file(GLOB ONNX_PROTO "" ${CMAKE_SOURCE_DIR}/third_party/proto/onnx/onnx.proto)
message("onnx proto path is :" ${ONNX_PROTO})
@ -334,6 +330,7 @@ set(BACKEND_SUB_COMP
runtime/graph_scheduler
runtime/hardware
runtime/pynative
runtime/data_queue
plugin/device/ascend/hal/device
plugin/device/ascend/hal/hardware
plugin/device/ascend/hal/hccl_adapter
@ -392,7 +389,7 @@ if(ENABLE_CPU)
endif()
if(ENABLE_GPU)
message("add gpu lib to mindspore_backend")
target_link_libraries(mindspore_backend PRIVATE gpu_cuda_lib gpu_queue cublas cuda_ops
target_link_libraries(mindspore_backend PRIVATE gpu_cuda_lib cublas cuda_ops
${CUDA_PATH}/lib64/libcurand.so
${CUDNN_LIBRARY_PATH}
${CUDA_PATH}/lib64/libcudart.so

View File

@ -156,7 +156,7 @@ if(ENABLE_D)
endif()
if(ENABLE_GPU)
target_link_libraries(mindspore_shared_lib PRIVATE gpu_cuda_lib gpu_queue cublas cuda_ops
target_link_libraries(mindspore_shared_lib PRIVATE gpu_cuda_lib cublas cuda_ops
${CUDA_PATH}/lib64/libcurand.so
${CUDNN_LIBRARY_PATH}
${CUDA_PATH}/lib64/libcudart.so

View File

@ -24,6 +24,7 @@
#include "ir/anf.h"
#include "ir/dtype/type.h"
#include "include/common/visible.h"
#include "utils/callback_handler.h"
namespace mindspore {
COMMON_EXPORT std::string GetNodeFuncStr(const AnfNodePtr &nd);

View File

@ -29,6 +29,7 @@
#include "ir/tensor.h"
#include "base/base_ref.h"
#include "include/common/visible.h"
#include "utils/callback_handler.h"
namespace py = pybind11;
namespace mindspore {

View File

@ -30,19 +30,4 @@
#define COMMON_EXPORT __attribute__((visibility("default")))
#define COMMON_LOCAL __attribute__((visibility("hidden")))
#endif
#define HANDLER_DEFINE(return_type, name, args...) \
public: \
template <typename... Args> \
static return_type name(const Args &... argss) { \
if (name##_handler_ == nullptr) { \
return return_type(); \
} \
return name##_handler_(argss...); \
} \
using name##Handler = std::function<decltype(name<args>)>; \
static void Set##name##Handler(name##Handler handler) { name##_handler_ = std::move(handler); } \
\
private: \
inline static name##Handler name##_handler_;
#endif // MINDSPORE_CCSRC_INCLUDE_COMMON_VISIBLE_H_

View File

@ -56,6 +56,7 @@ include_directories(${CMAKE_SOURCE_DIR}/mindspore/ccsrc/minddata/mindrecord/incl
include_directories(${CMAKE_SOURCE_DIR}/mindspore/ccsrc/minddata/dataset)
include_directories(${CMAKE_SOURCE_DIR}/mindspore/ccsrc/minddata/dataset/kernels/image)
######################################################################
####################### Flags ########################################
@ -245,12 +246,7 @@ endif()
target_link_libraries(_c_dataengine PUBLIC mindspore::jpeg_turbo mindspore::turbojpeg mindspore::opencv_core
mindspore::opencv_imgcodecs mindspore::opencv_imgproc mindspore::tinyxml2
mindspore::sentencepiece_train ${ICU_LIB})
if(ENABLE_GPUQUE)
target_link_libraries(_c_dataengine PRIVATE gpu_queue
${CUDNN_LIBRARY_PATH}
${CUDA_PATH}/lib64/libcudart.so
${CUDA_PATH}/lib64/stubs/libcuda.so)
endif()
if(ENABLE_TDTQUE)
target_link_libraries(_c_dataengine PRIVATE ${ACL} ${ACL_TDT_CHANNEL})

View File

@ -73,6 +73,8 @@ PYBIND_REGISTER(ConfigManager, 0, ([](const py::module *m) {
.def("get_enable_watchdog", &ConfigManager::enable_watchdog)
.def("set_multiprocessing_timeout_interval", &ConfigManager::set_multiprocessing_timeout_interval)
.def("get_multiprocessing_timeout_interval", &ConfigManager::multiprocessing_timeout_interval)
.def("set_dynamic_shape", &ConfigManager::set_dynamic_shape)
.def("get_dynamic_shape", &ConfigManager::dynamic_shape)
.def("load", [](ConfigManager &c, const std::string &s) { THROW_IF_ERROR(c.LoadFile(s)); });
}));

View File

@ -275,6 +275,14 @@ class ConfigManager {
// @param interval - multiprocessing timeout interval in seconds
void set_multiprocessing_timeout_interval(uint32_t interval) { multiprocessing_timeout_interval_ = interval; }
// setter function
// @param is_dynamic - Indicate whether the dataset is dynamic-shape
void set_dynamic_shape(bool is_dynamic) { dynamic_shape_ = is_dynamic; }
// getter function
// @return - Flag to indicate whether the dataset is dynamic-shape
bool dynamic_shape() const { return dynamic_shape_; }
private:
// Private helper function that takes a nlohmann json format and populates the settings
// @param j - The json nlohmann json info
@ -308,6 +316,7 @@ class ConfigManager {
bool enable_watchdog_; // Watchdog python thread enabled flag
uint32_t multiprocessing_timeout_interval_; // Multiprocessing timeout interval in seconds
std::string autotune_json_filepath_; // Filepath name of the final AutoTune Configuration JSON file
bool dynamic_shape_{false};
};
} // namespace dataset
} // namespace mindspore

View File

@ -41,11 +41,13 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i
data_info_queue_ptr_(nullptr),
first_fetch_flag_(false),
first_push_flag_(false) {
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
dynamic_shape_ = cfg->dynamic_shape();
#ifdef ENABLE_GPUQUE
// Get the total device num of current machine
int32_t device_count = 0;
cudaGetDeviceCount(&device_count);
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
rank_id_ = cfg->rank_id(); // Get the current rank_id
if (device_count > 0) {
rank_id_ = rank_id_ % device_count;
@ -154,7 +156,12 @@ Status DeviceQueueOp::operator()() {
RETURN_STATUS_UNEXPECTED(
"[Internal ERROR] Create channel for sending data failed, please check DEVICE ID setting.");
}
RETURN_IF_NOT_OK(SendDataToAscend());
if (dynamic_shape_) {
RETURN_IF_NOT_OK(SendDataToAscendDynamic());
} else {
RETURN_IF_NOT_OK(SendDataToAscend());
}
#endif
} else if (device_type_ == DeviceType::GPU) {
#ifdef ENABLE_GPUQUE
@ -414,6 +421,10 @@ Status DeviceQueueOp::LaunchParallelCopyThread() {
return Status::OK();
}
bool DeviceQueueOp::NoExceptionRaised() {
return !TaskManager::FindMe()->Interrupted() && !mindspore::DataQueueHandler::IsClosed();
}
Status DeviceQueueOp::PushDataToGPU() {
RETURN_UNEXPECTED_IF_NULL(tree_);
// Every thread use cuda api should SetThreadDevice
@ -442,12 +453,17 @@ Status DeviceQueueOp::PushDataToGPU() {
bool eoe_flag = item.eoe_flag;
int64_t send_batch = 0;
auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2);
auto ret = GpuBufferMgr::GetInstance().Open(channel_name_, {}, release_function);
BlockQueueStatus_T ret;
if (dynamic_shape_) {
ret = mindspore::DataQueueHandler::OpenDynamicBufQueue(channel_name_, release_function);
} else {
ret = mindspore::DataQueueHandler::Open(channel_name_, release_function);
}
if (ret != BlockQueueStatus_T::SUCCESS) {
RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to open channel for sending data.");
}
while (!(items.empty() && !eoe_flag) && !GpuBufferMgr::GetInstance().IsClosed()) {
while (!(items.empty() && !eoe_flag) && !mindspore::DataQueueHandler::IsClosed()) {
if (!eoe_flag) {
#ifdef ENABLE_DUMP_IR
md_channel_info_->RecordBatchQueue(gpu_connector_->size());
@ -482,14 +498,14 @@ Status DeviceQueueOp::PushDataToGPU() {
}
#endif
}
if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
if (NoExceptionRaised()) {
auto rc = gpu_connector_->Pop(0, &item);
items = std::move(item.data_item);
eoe_flag = item.eoe_flag;
// If the batches send by dataset are more than gpu calculate, gpu will core for no signal notify.
if (rc.IsError()) {
GpuBufferMgr::GetInstance().Close(channel_name_);
GpuBufferMgr::GetInstance().CloseConfirm();
mindspore::DataQueueHandler::Close(channel_name_);
mindspore::DataQueueHandler::CloseConfirm();
return rc;
}
} else {
@ -498,51 +514,14 @@ Status DeviceQueueOp::PushDataToGPU() {
}
// now we use this flag to judge whether exception raised.
if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
if (NoExceptionRaised()) {
send_finished_ = true;
}
tree_->SetFinished();
MS_LOG(INFO) << "ExecutionTree finished. Device queue pushed number of batches: " << send_batch;
GpuBufferMgr::GetInstance().Close(channel_name_);
GpuBufferMgr::GetInstance().CloseConfirm();
return Status::OK();
}
Status DeviceQueueOp::RetryPushData(const std::vector<DataItemGpu> &items, const bool profiling, uint64_t *push_time) {
bool flag_log = false;
#ifndef ENABLE_SECURITY
uint64_t start_time = 0;
if (profiling) {
start_time = ProfilingTime::GetCurMilliSecond();
}
#endif
while (!GpuBufferMgr::GetInstance().IsClosed() && !TaskManager::FindMe()->Interrupted()) {
BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(channel_name_, items, WAIT_TIME);
if (ret) {
if (ret == BlockQueueStatus_T::ERROR_INPUT) {
RETURN_STATUS_UNEXPECTED(
"Invalid data, the types or shapes of current row is different with previous row(i.e. do batch operation but "
"drop_reminder is False, or without resize image into the same size, these will cause shapes differs).");
} else {
if (!stop_send_) {
if (!flag_log) {
MS_LOG(DEBUG) << "Retry pushing data...";
flag_log = true;
}
continue;
}
break;
}
} else {
break;
}
}
#ifndef ENABLE_SECURITY
if (profiling) {
*push_time = ProfilingTime::GetCurMilliSecond() - start_time;
}
#endif
mindspore::DataQueueHandler::Close(channel_name_);
mindspore::DataQueueHandler::CloseConfirm();
return Status::OK();
}
@ -554,12 +533,12 @@ Status DeviceQueueOp::WorkerEntry(int32_t worker_id) {
TensorRow current_row;
uint32_t batch_num = 0;
RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(&current_row));
while (!current_row.quit() && !GpuBufferMgr::GetInstance().IsClosed()) {
while (!current_row.quit() && !mindspore::DataQueueHandler::IsClosed()) {
GpuConnectorItem connector_item = {{}, current_row.eoe()};
if (!connector_item.eoe_flag) {
std::vector<device::DataItemGpu> items;
std::vector<device::DataQueueItem> items;
for (auto &i : current_row) {
device::DataItemGpu data_item;
device::DataQueueItem data_item;
data_item.data_len_ = static_cast<size_t>(i->SizeInBytes());
data_item.shapes_ = i->shape().AsVector();
data_item.data_ptr_ = nullptr;
@ -596,8 +575,8 @@ Status DeviceQueueOp::SendDataToGPU() {
int64_t num_buf = 0;
bool is_break_loop = false;
uint32_t batch_num = 0;
while (!current_row.eof() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
while (!current_row.eoe() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
while (!current_row.eof() && !is_break_loop && !mindspore::DataQueueHandler::IsClosed()) {
while (!current_row.eoe() && !is_break_loop && !mindspore::DataQueueHandler::IsClosed()) {
batch_num++;
RETURN_IF_NOT_OK(FilterMetadata(&current_row));
RETURN_IF_NOT_OK(CheckExceptions(current_row));
@ -610,7 +589,7 @@ Status DeviceQueueOp::SendDataToGPU() {
#ifndef ENABLE_SECURITY
batch_record_start = ProfilingTime::GetCurMilliSecond();
#endif
if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
if (NoExceptionRaised()) {
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row));
} else {
is_break_loop = true;
@ -623,7 +602,7 @@ Status DeviceQueueOp::SendDataToGPU() {
RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(eoe_flag)));
}
if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
if (NoExceptionRaised()) {
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row));
} else {
is_break_loop = true;
@ -640,7 +619,7 @@ Status DeviceQueueOp::SendDataToGPU() {
return Status::OK();
}
Status DeviceQueueOp::MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row,
Status DeviceQueueOp::MallocForGPUData(std::vector<device::DataQueueItem> *items, const TensorRow &curr_row,
const int32_t &worker_id) {
int i = 0;
for (auto &sub_item : *items) {
@ -667,16 +646,16 @@ Status DeviceQueueOp::MallocForGPUData(std::vector<device::DataItemGpu> *items,
Status DeviceQueueOp::ClearDevice() {
MS_LOG(INFO) << "Clearing the data in GPU device: " << device_id_ << " channel: " << channel_name_;
auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2);
auto ret = GpuBufferMgr::GetInstance().Open(channel_name_, {}, release_function);
auto ret = mindspore::DataQueueHandler::Open(channel_name_, release_function);
if (ret != BlockQueueStatus_T::SUCCESS) {
RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to open channel for clearing the device.");
}
ret = GpuBufferMgr::GetInstance().Clear(channel_name_);
ret = mindspore::DataQueueHandler::Clear(channel_name_);
CHECK_FAIL_RETURN_UNEXPECTED(!ret, "Failed to clear the device.");
GpuBufferMgr::GetInstance().Close(channel_name_);
GpuBufferMgr::GetInstance().CloseConfirm();
mindspore::DataQueueHandler::Close(channel_name_);
mindspore::DataQueueHandler::CloseConfirm();
return Status::OK();
}
@ -791,5 +770,114 @@ void DeviceQueueOp::PrintEndInfoWhenFirstBatch(bool *first_push_flag) {
*first_push_flag = true;
}
}
Status DeviceQueueOp::RetryPushData(const std::vector<DataQueueItem> &items, const bool profiling,
uint64_t *push_time) {
bool flag_log = false;
#ifndef ENABLE_SECURITY
uint64_t start_time = 0;
if (profiling) {
start_time = ProfilingTime::GetCurMilliSecond();
}
#endif
while (!mindspore::DataQueueHandler::IsClosed() && !TaskManager::FindMe()->Interrupted()) {
BlockQueueStatus_T ret = mindspore::DataQueueHandler::Push(channel_name_, items, WAIT_TIME);
if (ret != BlockQueueStatus_T::SUCCESS) {
if (ret == BlockQueueStatus_T::ERROR_INPUT) {
return Status(
StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
"Invalid data, the types or shapes of current row is different with previous row(i.e. do batch operation but "
"drop_reminder is False, or without resize image into the same size, these will cause shapes differs).");
} else {
if (!stop_send_) {
if (!flag_log) {
MS_LOG(DEBUG) << "Retry pushing data...";
flag_log = true;
}
continue;
}
break;
}
} else {
break;
}
}
#ifndef ENABLE_SECURITY
if (profiling) {
*push_time = ProfilingTime::GetCurMilliSecond() - start_time;
}
#endif
return Status::OK();
}
Status DeviceQueueOp::SendDataToAscendDynamic() {
MS_LOG(DEBUG) << "Dynamic Device queue, sending data to Ascend.";
int64_t send_batch = 0;
uint64_t data_queue_cost = 0;
bool is_break_loop = false;
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
std::function<void(void *, int32_t)> release_function([](void *, int32_t) { return; });
auto ret = mindspore::DataQueueHandler::OpenDynamicBufQueue(channel_name_, release_function);
if (ret != BlockQueueStatus_T::SUCCESS) {
return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
"[Internal ERROR] Failed to open channel for sending data.");
}
TensorRow curr_row;
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
first_fetch_flag_ = true;
while (!curr_row.eof() && !is_break_loop) {
while (!curr_row.eoe() && !is_break_loop) {
RETURN_IF_NOT_OK(FilterMetadata(&curr_row));
RETURN_IF_NOT_OK(CheckExceptions(curr_row));
std::vector<device::DataQueueItem> items;
for (auto &i : curr_row) {
device::DataQueueItem data_item;
data_item.data_len_ = static_cast<size_t>(i->SizeInBytes());
data_item.shapes_ = i->shape().AsVector();
data_item.data_ptr_ = const_cast<void *>((const void *)(i->GetBuffer()));
data_item.data_type_ = i->type().ToString();
items.push_back(data_item);
}
RETURN_IF_NOT_OK(RetryPushData(items, false, &data_queue_cost));
if (create_data_info_queue_) {
DATA_INFO data_info;
(void)std::transform(curr_row.begin(), curr_row.end(), std::back_inserter(data_info),
[](const std::shared_ptr<Tensor> &ts) { return std::make_pair(ts->type(), ts->shape()); });
RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info));
}
send_batch++;
if (total_batch_ > 0 && send_batch >= total_batch_) {
is_break_loop = true;
break;
}
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
}
if (curr_row.eoe() && send_epoch_end_) {
MS_LOG(INFO) << "an epoch has already sent, now stop send data.";
stop_send_ = true;
}
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
}
// now we use this flag to judge whether exception raised.
if (stop_send_ || !TaskManager::FindMe()->Interrupted()) {
send_finished_ = true;
}
tree_->SetFinished();
MS_LOG(INFO) << "ExecutionTree finished. Device queue sent number of batches: " << send_batch;
mindspore::DataQueueHandler::Close(channel_name_);
mindspore::DataQueueHandler::CloseConfirm();
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -27,6 +27,7 @@
#include "minddata/dataset/engine/perf/device_queue_tracing.h"
#include "minddata/dataset/util/status.h"
#include "mindspore/core/utils/data_queue_handler.h"
#ifdef ENABLE_DUMP_IR
#include "minddata/dataset/util/rdr.h"
#endif
@ -39,24 +40,22 @@
#ifdef ENABLE_GPUQUE
#include "minddata/dataset/engine/gpu_item_connector.h"
#include "minddata/dataset/util/circular_pool.h"
#include "plugin/device/gpu/hal/device/gpu_buffer_mgr.h"
#include "ps/ps_cache/ps_data/ps_data_prefetch.h"
using mindspore::device::BlockQueueStatus_T;
using mindspore::device::GpuBufferMgr;
#endif
namespace mindspore {
namespace dataset {
using DATA_INFO = std::vector<std::pair<DataType, TensorShape>>;
using DATA_INFO_QUEUE = Queue<DATA_INFO>;
using mindspore::device::BlockQueueStatus_T;
using mindspore::device::DataQueueItem;
constexpr int32_t kTimeOutMilliSeconds = 25000;
const int kDataInfoQueueCapacity = 128;
class DeviceQueueOp : public PipelineOp {
public:
static const uint32_t INVALID_HANDLE = 0xffffffffUL;
static const uint32_t WAIT_TIME = 5;
const uint32_t WAIT_TIME = 5;
enum class DeviceType { Ascend = 0, GPU = 1, CPU = 2 };
@ -131,6 +130,9 @@ class DeviceQueueOp : public PipelineOp {
// Name: PrintEndInfoWhenFirstBatch(bool)
// Description: Print info when first batch send successful in sink_mode
void PrintEndInfoWhenFirstBatch(bool *first_push_flag);
Status RetryPushData(const std::vector<DataQueueItem> &data, bool profiling, uint64_t *push_time);
bool NoExceptionRaised();
Status SendDataToAscendDynamic();
#ifdef ENABLE_TDTQUE
void WaitContinueSignal() const;
@ -144,8 +146,8 @@ class DeviceQueueOp : public PipelineOp {
#ifdef ENABLE_GPUQUE
Status SendDataToGPU();
Status MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row, const int32_t &worker_id);
Status RetryPushData(const std::vector<DataItemGpu> &data, bool profiling, uint64_t *push_time);
Status MallocForGPUData(std::vector<device::DataQueueItem> *items, const TensorRow &curr_row,
const int32_t &worker_id);
void ReleaseData(void *addr, int32_t worker_id);
Status LaunchParallelCopyThread();
Status PushDataToGPU();
@ -188,6 +190,7 @@ class DeviceQueueOp : public PipelineOp {
std::atomic<bool> first_fetch_flag_;
std::mutex data_info_mutex_;
bool first_push_flag_; // default: false, when first push, it will be true
bool dynamic_shape_{false};
#ifdef ENABLE_TDTQUE
std::shared_ptr<TdtPlugin> tdtInstancePtr;

View File

@ -24,15 +24,15 @@
#include "minddata/dataset/engine/connector.h"
#include "minddata/dataset/util/status.h"
#include "minddata/dataset/include/dataset/constants.h"
#include "plugin/device/gpu/hal/device/blocking_queue.h"
#include "runtime/data_queue/blocking_queue.h"
using mindspore::device::DataItemGpu;
using mindspore::device::DataQueueItem;
namespace mindspore {
namespace dataset {
struct GpuConnectorItem {
std::vector<device::DataItemGpu> data_item;
std::vector<device::DataQueueItem> data_item;
bool eoe_flag; // flag to indicate an EOE item in the connector
};

View File

@ -0,0 +1,77 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "plugin/device/ascend/hal/device/ascend_data_queue.h"
#include <string>
#include "plugin/device/ascend/hal/device/ascend_kernel_runtime.h"
#include "utils/log_adapter.h"
namespace mindspore {
namespace device {
void CheckRtRetWithError(rtError_t error, const std::string &msg) {
if (error != RT_ERROR_NONE) {
MS_LOG(ERROR) << "Rt error: " << msg << " | Error number: " << error;
}
}
AscendDataQueueDynamic::AscendDataQueueDynamic(const size_t capacity)
: DataQueue(capacity), stream_(0), node_info_(nullptr) {
auto context_key = device_context_->device_context_key();
auto runtime_instance = dynamic_cast<ascend::AscendKernelRuntime *>(
device::KernelRuntimeManager::Instance().GetKernelRuntime(context_key.device_name_, context_key.device_id_));
node_info_ = std::make_unique<NodeInfo[]>(capacity);
stream_ = runtime_instance->compute_stream();
}
BlockQueueStatus_T AscendDataQueueDynamic::Push(std::vector<DataQueueItem> data) {
for (size_t i = 0; i < data.size(); i++) {
auto &item = data[i];
if (item.data_ptr_ == nullptr) {
MS_LOG(ERROR) << "Invalid Input: ptr: " << item.data_ptr_ << ", len: " << item.data_len_;
return ERROR_INPUT;
}
void *addr = device_context_->AllocateMemory(item.data_len_);
if (addr == nullptr) {
MS_LOG(ERROR) << "Allocate device memory of data queue failed";
}
CheckRtRetWithError(
rtMemcpyAsync(addr, item.data_len_, item.data_ptr_, item.data_len_, RT_MEMCPY_HOST_TO_DEVICE, stream_),
"Rt Memcpy Error");
item.device_addr_ = addr;
}
CheckRtRetWithError(rtStreamSynchronize(stream_), "Call runtime rtStreamSynchronize failed");
node_info_[tail_].data_ = data;
tail_ = (tail_ + 1) % (capacity_);
++size_;
return SUCCESS;
}
BlockQueueStatus_T AscendDataQueueDynamic::Front(std::vector<DataQueueItem> *data) const {
for (auto &item : node_info_[head_].data_) {
host_release_(item.data_ptr_, item.worker_id_);
}
*data = node_info_[head_].data_;
return SUCCESS;
}
BlockQueueStatus_T AscendDataQueueDynamic::Pop() {
head_ = (head_ + 1) % (capacity_);
--size_;
return SUCCESS;
}
bool AscendDataQueueDynamic::Destroy() { return true; }
} // namespace device
} // namespace mindspore

View File

@ -0,0 +1,50 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_RUNTIME_DEVICE_ASCEND_DATA_QUEUE_H_
#define MINDSPORE_CCSRC_RUNTIME_DEVICE_ASCEND_DATA_QUEUE_H_
#include <unistd.h>
#include <memory>
#include <vector>
#include <functional>
#include "runtime/hardware/device_context_manager.h"
#include "runtime/data_queue/data_queue.h"
#include "runtime/rt.h"
namespace mindspore {
namespace device {
class AscendDataQueueDynamic : public DataQueue {
public:
explicit AscendDataQueueDynamic(const size_t capacity);
virtual ~AscendDataQueueDynamic() = default;
BlockQueueStatus_T Push(std::vector<DataQueueItem> data);
BlockQueueStatus_T Front(std::vector<DataQueueItem> *data) const;
BlockQueueStatus_T Pop();
bool Destroy();
private:
struct NodeInfo {
std::vector<DataQueueItem> data_;
};
rtStream_t stream_;
std::unique_ptr<NodeInfo[]> node_info_;
};
} // namespace device
} // namespace mindspore
#endif // MINDSPORE_CCSRC_RUNTIME_DEVICE_ASCEND_BLOCKING_QUEUE_H_

View File

@ -848,6 +848,9 @@ std::tuple<KernelSelectStatus, std::string, ExceptionType> SelectKernelInfoWithM
SetTensorDeviceInfo(kernel_node);
select_status = kStatusAllMatched;
}
if (IsPrimitiveCNode(kernel_node, prim::kPrimGetNext) && !common::AnfAlgo::IsDynamicShape(kernel_node)) {
select_status = kNoMatched;
}
// If node can't find valid ai_core kernel info, re-find in ai_cpu kernel info
if (select_status == kNoMatched) {
GatherInputAndOutputInferType(aicore_in_out_info, kernel_node);

View File

@ -38,6 +38,7 @@
#include "plugin/device/ascend/hal/device/ascend_bucket.h"
#include "common/util/error_manager/error_manager.h"
#include "plugin/device/ascend/hal/device/ascend_memory_adapter.h"
#include "runtime/data_queue/data_queue_mgr.h"
#include "backend/common/optimizer/common_backend_optimization.h"
#ifndef ENABLE_SECURITY
#include "debug/data_dump/dump_json_parser.h"
@ -314,6 +315,10 @@ void AscendDeviceContext::Destroy() {
return;
}
MS_LOG(INFO) << "Status record: Destroy start...";
if (DataQueueMgr::GetInstance().IsInit()) {
MS_EXCEPTION_IF_CHECK_FAIL(DataQueueMgr::GetInstance().Destroy(), "Could not destroy ascend data queue.");
}
graph_event_.clear();
rank_id_ = 0;
if (runtime_instance_) {

View File

@ -0,0 +1,93 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "plugin/device/ascend/kernel/rts/getnext_dynamic.h"
#include <memory>
#include <numeric>
#include <functional>
#include <string>
#include <map>
#include "abstract/utils.h"
#include "backend/common/session/anf_runtime_algorithm.h"
#include "include/common/utils/anfalgo.h"
#include "kernel/common_utils.h"
#include "runtime/mem.h"
#include "acl/acl_rt.h"
#include "runtime/device/kernel_runtime.h"
#include "utils/ms_context.h"
#include "runtime/data_queue/data_queue_mgr.h"
namespace mindspore {
namespace kernel {
GetNextDynamic::GetNextDynamic() {}
GetNextDynamic::~GetNextDynamic() {}
bool GetNextDynamic::Launch(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &,
const std::vector<AddressPtr> &outputs, void *stream_ptr) {
return true;
}
bool GetNextDynamic::Init(const mindspore::AnfNodePtr &anf_node) {
anf_node_ = anf_node;
auto output_num = common::AnfAlgo::GetOutputTensorNum(anf_node);
std::vector<size_t> size_list;
for (size_t idx = 0; idx < output_num; ++idx) {
size_list.push_back(0);
}
SetOutputSizeList(size_list);
return true;
}
int GetNextDynamic::Resize(const BaseOperatorPtr &base_operator, const std::vector<KernelTensorPtr> &inputs,
const std::vector<KernelTensorPtr> &outputs,
const std::map<uint32_t, tensor::TensorPtr> &others) {
auto data_kernel = anf_node_.lock();
bool ret = device::PopDataFromDataQueue(data_kernel);
if (!ret) {
return KernelErrorCode::KRET_RESIZE_FAILED;
}
return KernelErrorCode::KRET_OK;
}
GetNextDynamicDesc::GetNextDynamicDesc() {}
GetNextDynamicDesc::~GetNextDynamicDesc() {}
// GetNextDynamic KernelInfo Register
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> GetNextDynamicDesc::GetKernelInfo(const CNodePtr &kernel_node) {
MS_EXCEPTION_IF_NULL(kernel_node);
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> get_next_dynamic_build_info{};
auto output_num = common::AnfAlgo::GetOutputTensorNum(kernel_node);
std::vector<string> output_format;
std::vector<TypeId> output_type;
for (size_t idx = 0; idx < output_num; ++idx) {
auto data_type = common::AnfAlgo::GetOutputInferDataType(kernel_node, idx);
output_type.push_back(data_type);
output_format.push_back(kOpFormat_DEFAULT);
}
auto builder = KernelBuildInfo::KernelBuildInfoBuilder();
builder.SetOutputsFormat(output_format);
builder.SetOutputsDeviceType(output_type);
builder.SetProcessor(AICORE);
builder.SetKernelType(RT_KERNEL);
builder.SetFusionType(OPAQUE);
get_next_dynamic_build_info.emplace_back(builder.Build());
return get_next_dynamic_build_info;
}
} // namespace kernel
} // namespace mindspore

View File

@ -0,0 +1,58 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_RTS_GETNEXT_DYNAMIC_H
#define MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_RTS_GETNEXT_DYNAMIC_H
#include <vector>
#include <memory>
#include <map>
#include "plugin/device/ascend/kernel/rts/rt_kernel.h"
#include "plugin/device/ascend/kernel/rts/rt_kernel_info.h"
namespace mindspore {
namespace kernel {
class GetNextDynamic : public RtKernel {
public:
GetNextDynamic();
~GetNextDynamic() override;
bool Init(const AnfNodePtr &anf_node) override;
bool Launch(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &workspace,
const std::vector<AddressPtr> &outputs, void *stream_ptr) override;
int Resize(const BaseOperatorPtr &base_operator, const std::vector<KernelTensorPtr> &inputs,
const std::vector<KernelTensorPtr> &outputs,
const std::map<uint32_t, tensor::TensorPtr> &others = std::map<uint32_t, tensor::TensorPtr>());
std::vector<TaskInfoPtr> GenTask(const std::vector<AddressPtr> &, const std::vector<AddressPtr> &,
const std::vector<AddressPtr> &, uint32_t) {
std::vector<TaskInfoPtr> res;
return res;
}
};
class GetNextDynamicDesc : public RtKerDesc {
public:
GetNextDynamicDesc();
~GetNextDynamicDesc() override;
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> GetKernelInfo(const CNodePtr &kernel_node = nullptr) override;
};
MS_REG_RTKERNEL_DESC(getnext, GetNextDynamicDesc);
MS_REG_RTKERNEL(getnext, GetNextDynamic);
} // namespace kernel
} // namespace mindspore
#endif // MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_RTS_GETNEXT_DYNAMIC_H

View File

@ -74,7 +74,7 @@ std::vector<TaskInfoPtr> LabelSwitchKernel::GenTask(const std::vector<AddressPtr
return task_info_list;
}
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> LabelSwitchDesc::GetKernelInfo() {
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> LabelSwitchDesc::GetKernelInfo(const CNodePtr &) {
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> label_switch_build_info{};
std::vector<string> input_format{kOpFormat_DEFAULT};
std::vector<TypeId> input_type{kNumberTypeInt32};

View File

@ -46,7 +46,7 @@ class LabelSwitchDesc : public RtKerDesc {
public:
LabelSwitchDesc() = default;
~LabelSwitchDesc() override = default;
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> GetKernelInfo() override;
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> GetKernelInfo(const CNodePtr &kernel_node = nullptr) override;
};
MS_REG_RTKERNEL_DESC(labelswitch, LabelSwitchDesc);

View File

@ -144,7 +144,7 @@ MemCpyAsyncDesc::MemCpyAsyncDesc() {}
MemCpyAsyncDesc::~MemCpyAsyncDesc() {}
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> MemCpyAsyncDesc::GetKernelInfo() {
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> MemCpyAsyncDesc::GetKernelInfo(const CNodePtr &) {
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> memcpy_build_info{};
for (const auto &format : format_list) {
for (const auto &type : data_type_list) {

View File

@ -45,7 +45,7 @@ class MemCpyAsyncDesc : public RtKerDesc {
public:
MemCpyAsyncDesc();
~MemCpyAsyncDesc() override;
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> GetKernelInfo() override;
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> GetKernelInfo(const CNodePtr &kernel_node = nullptr) override;
};
MS_REG_RTKERNEL_DESC(memcpy_async, MemCpyAsyncDesc);

View File

@ -32,7 +32,6 @@ class RtKernel : public AscendKernelMod {
RtKernel();
~RtKernel() override;
virtual bool Init(const AnfNodePtr &anf_node);
void SetInputSizeList(const std::vector<size_t> &size_list) override;
void SetOutputSizeList(const std::vector<size_t> &size_list) override;
void SetWorkspaceSizeList(const std::vector<size_t> &size_list) override;

View File

@ -59,8 +59,8 @@ void GetRtKelInfo(const CNodePtr &kernel_node,
(void)std::transform(opNameLower.begin(), opNameLower.end(), opNameLower.begin(), ::tolower);
auto ker_desc_ptr = RtKerDescFactory::Create(opNameLower);
if (ker_desc_ptr != nullptr && !ker_desc_ptr->GetKernelInfo().empty()) {
*kernel_info_list = ker_desc_ptr->GetKernelInfo();
if (ker_desc_ptr != nullptr && !ker_desc_ptr->GetKernelInfo(kernel_node).empty()) {
*kernel_info_list = ker_desc_ptr->GetKernelInfo(kernel_node);
return;
}
// if can't find kernel info in kernel info database, use the default kernel info

View File

@ -35,7 +35,7 @@ namespace kernel {
class RtKerDesc {
public:
virtual ~RtKerDesc() {}
virtual std::vector<std::shared_ptr<kernel::KernelBuildInfo>> GetKernelInfo() {
virtual std::vector<std::shared_ptr<kernel::KernelBuildInfo>> GetKernelInfo(const CNodePtr &kernel_node = nullptr) {
return std::vector<std::shared_ptr<kernel::KernelBuildInfo>>{};
}
};

View File

@ -171,7 +171,7 @@ TensorCopySlicesDesc::TensorCopySlicesDesc() {}
TensorCopySlicesDesc::~TensorCopySlicesDesc() {}
// TensorCopySlices Register
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> TensorCopySlicesDesc::GetKernelInfo() {
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> TensorCopySlicesDesc::GetKernelInfo(const CNodePtr &) {
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> tensor_copy_slices_build_info{};
for (const auto &format : format_list) {
for (const auto &type : data_type_list) {

View File

@ -55,7 +55,7 @@ class TensorCopySlicesDesc : public RtKerDesc {
public:
TensorCopySlicesDesc();
~TensorCopySlicesDesc() override;
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> GetKernelInfo() override;
std::vector<std::shared_ptr<kernel::KernelBuildInfo>> GetKernelInfo(const CNodePtr &kernel_node = nullptr) override;
};
MS_REG_RTKERNEL_DESC(tensorcopyslices, TensorCopySlicesDesc);

View File

@ -20,17 +20,11 @@ if(ENABLE_GPU)
file(GLOB_RECURSE CUDA_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc" "*.cu")
set(GPU_QUEUE_SRCS "blocking_queue.cc" "gpu_buffer_mgr.cc")
#set(GPU_QUEUE_SRCS "blocking_queue.cc" "gpu_buffer_mgr.cc" "data_queue.cc")
set(GPU_COLLECTIVE_SRCS "distribution/collective_wrapper.cc"
"distribution/mpi_wrapper.cc"
"distribution/nccl_wrapper.cc")
# gpu_queue
list(REMOVE_ITEM CUDA_SRC_LIST ${GPU_QUEUE_SRCS})
set_property(SOURCE ${GPU_QUEUE_SRCS} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_DEVICE)
add_library(gpu_queue SHARED ${GPU_QUEUE_SRCS})
target_link_libraries(gpu_queue ${CMAKE_THREAD_LIBS_INIT} ${CUDA_PATH}/lib64/libcudart.so)
list(REMOVE_ITEM CUDA_SRC_LIST "mpi/mpi_initializer.cc" ${GPU_COLLECTIVE_SRCS})
if(ENABLE_MPI)

View File

@ -1,106 +0,0 @@
/**
* Copyright 2019 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_RUNTIME_DEVICE_GPU_BLOCKING_QUEUE_H_
#define MINDSPORE_CCSRC_RUNTIME_DEVICE_GPU_BLOCKING_QUEUE_H_
#include <unistd.h>
#include <cuda_runtime_api.h>
#include <iostream>
#include <memory>
#include <mutex>
#include <cstring>
#include <string>
#include <vector>
#include <condition_variable>
#include <functional>
namespace mindspore {
namespace device {
enum BlockQueueStatus_T : int { SUCCESS = 0, QUEUE_EXIST, QUEUE_NOT_EXIST, ERROR_INPUT, INTERNAL_ERROR, TIMEOUT };
struct DataItemGpu {
int32_t worker_id_{0};
std::string data_type_;
size_t data_len_{0};
void *data_ptr_{nullptr};
std::vector<int64_t> shapes_;
void *device_addr_{nullptr};
};
class GpuQueue {
public:
GpuQueue(void *addr, const std::vector<size_t> &shape, const size_t &capacity);
virtual ~GpuQueue();
void RegisterRelease(const std::function<void(void *, int32_t)> &func) { host_release_ = func; }
inline bool IsEmpty() const { return size_ == 0; }
inline bool IsFull() const { return size_ == capacity_; }
BlockQueueStatus_T Push(std::vector<DataItemGpu> data);
BlockQueueStatus_T Front(std::vector<DataItemGpu> *data) const;
BlockQueueStatus_T Pop();
bool Destroy();
size_t Size() { return size_; }
size_t Capacity() { return capacity_; }
private:
struct NodeInfo {
std::unique_ptr<cudaEvent_t> event_;
std::vector<DataItemGpu> data_;
};
void *buffer_;
size_t head_;
size_t tail_;
std::vector<size_t> shape_;
size_t len_;
size_t size_;
size_t capacity_;
cudaStream_t stream_;
std::unique_ptr<NodeInfo[]> node_info_;
std::function<void(void *, int32_t)> host_release_;
GpuQueue(const GpuQueue &) = delete;
GpuQueue &operator=(const GpuQueue &) = delete;
};
class BlockingQueue {
public:
BlockingQueue() : queue_(nullptr) {}
~BlockingQueue() = default;
BlockQueueStatus_T Create(void *addr, const std::vector<size_t> &shape, const size_t &capacity);
void RegisterRelease(const std::function<void(void *, int32_t)> &func);
BlockQueueStatus_T Push(const std::vector<DataItemGpu> &data, unsigned int timeout_in_sec);
BlockQueueStatus_T Front(std::vector<DataItemGpu> *data);
BlockQueueStatus_T Pop();
BlockQueueStatus_T Clear();
bool Destroy();
size_t Size() { return queue_->Size(); }
size_t Capacity() { return queue_->Capacity(); }
private:
std::mutex mutex_;
std::condition_variable not_full_cond_;
std::condition_variable not_empty_cond_;
std::shared_ptr<GpuQueue> queue_;
};
} // namespace device
} // namespace mindspore
#endif // MINDSPORE_CCSRC_RUNTIME_DEVICE_GPU_BLOCKING_QUEUE_H_

View File

@ -1,202 +0,0 @@
/**
* Copyright 2019 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "plugin/device/gpu/hal/device/gpu_buffer_mgr.h"
#include <cuda_runtime_api.h>
#include <utility>
#include "utils/log_adapter.h"
#include "utils/ms_utils.h"
#include "pybind11/pybind11.h"
#include "pybind11/stl.h"
namespace py = pybind11;
namespace mindspore {
namespace device {
GpuBufferMgr &GpuBufferMgr::GetInstance() noexcept {
static GpuBufferMgr instance;
return instance;
}
BlockQueueStatus_T GpuBufferMgr::Create(const std::string &channel_name, void *addr, const std::vector<size_t> &shape,
const size_t &capacity) {
MS_LOG(INFO) << "Gpu queue: " << channel_name << " created.";
if (name_queue_map_.count(channel_name)) {
MS_LOG(ERROR) << "Queue already exist: " << channel_name;
return QUEUE_EXIST;
}
std::shared_ptr<BlockingQueue> queue = std::make_shared<BlockingQueue>();
BlockQueueStatus_T rt = queue->Create(addr, shape, capacity);
if (rt != SUCCESS) {
MS_LOG(ERROR) << "Queue: " << channel_name << "create failed: " << rt;
return rt;
}
(void)name_queue_map_.insert(std::make_pair(channel_name, queue));
init_ = true;
return SUCCESS;
}
BlockQueueStatus_T GpuBufferMgr::Open(const std::string &channel_name, const std::vector<size_t> &shape,
const std::function<void(void *, int32_t)> func) {
MS_LOG(INFO) << "Gpu queue: " << channel_name << " open.";
set_device();
if (!name_queue_map_.count(channel_name)) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
name_queue_map_[channel_name]->RegisterRelease(func);
open_by_dataset_++;
return SUCCESS;
}
BlockQueueStatus_T GpuBufferMgr::Open(const std::string &channel_name, const std::vector<size_t> &shape) {
MS_LOG(INFO) << "Gpu queue: " << channel_name << " open.";
set_device();
if (!name_queue_map_.count(channel_name)) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
return SUCCESS;
}
void GpuBufferMgr::set_device_id(int device_id) { cur_dev_id_ = device_id; }
void GpuBufferMgr::set_device() const {
auto ret = cudaSetDevice(cur_dev_id_);
if (ret != cudaSuccess) {
MS_LOG(ERROR)
<< "Set device for id:" << cur_dev_id_ << " failed, ret[" << static_cast<int>(ret) << "], "
<< cudaGetErrorString(ret)
<< ". Please make sure that the 'device_id' set in context is in the range:[0, total number of GPU). "
"If the environment variable 'CUDA_VISIBLE_DEVICES' is set, the total number of GPU will be the number set "
"in the environment variable 'CUDA_VISIBLE_DEVICES'. For example, if export CUDA_VISIBLE_DEVICES=4,5,6, the "
"'device_id' can be 0,1,2 at the moment, 'device_id' starts from 0, and 'device_id'=0 means using GPU of "
"number 4.";
}
}
BlockQueueStatus_T GpuBufferMgr::Push(const std::string &channel_name, const std::vector<DataItemGpu> &data,
unsigned int timeout_in_sec) {
auto iter = name_queue_map_.find(channel_name);
if (iter == name_queue_map_.end()) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
return iter->second->Push(data, timeout_in_sec);
}
BlockQueueStatus_T GpuBufferMgr::Front(const std::string &channel_name, std::vector<DataItemGpu> *data) {
auto iter = name_queue_map_.find(channel_name);
if (iter == name_queue_map_.end()) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
return iter->second->Front(data);
}
BlockQueueStatus_T GpuBufferMgr::Pop(const std::string &channel_name) {
auto iter = name_queue_map_.find(channel_name);
if (iter == name_queue_map_.end()) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
return iter->second->Pop();
}
BlockQueueStatus_T GpuBufferMgr::Clear(const std::string &channel_name) {
auto iter = name_queue_map_.find(channel_name);
if (iter == name_queue_map_.end()) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
return iter->second->Clear();
}
void GpuBufferMgr::Close(const std::string &channel_name) noexcept {
MS_LOG(INFO) << "Close the queue: " << channel_name;
return;
}
bool GpuBufferMgr::IsInit() const { return init_; }
bool GpuBufferMgr::IsClosed() const { return closed_; }
bool GpuBufferMgr::Destroy() {
MS_LOG(INFO) << "Destroy all GPU queue.";
for (auto iter = name_queue_map_.begin(); iter != name_queue_map_.end(); ++iter) {
std::shared_ptr<BlockingQueue> queue = iter->second;
if (queue != nullptr) {
if (!queue->Destroy()) {
return false;
}
queue.reset();
}
}
name_queue_map_.clear();
return true;
}
inline bool GpuBufferMgr::isCreated(const std::string &channel_name) {
if (name_queue_map_.count(channel_name) != 0) {
return true;
}
return false;
}
bool GpuBufferMgr::CloseNotify() {
py::gil_scoped_release release;
bool result = true;
// lock scope
{
std::lock_guard<std::mutex> lk(close_mutex_);
// set closed_ to be true, all the dataset retry can be jumped out of the while
closed_ = true;
}
// wati for the dataset threads' ack
for (int i = 0; i < open_by_dataset_; i++) {
if (sema.Wait() == false) {
MS_LOG(ERROR) << "time out of receiving signals";
result = false;
}
MS_LOG(DEBUG) << "receive one signal (" << i + 1 << "/" << open_by_dataset_ << ")";
}
return result;
}
void GpuBufferMgr::CloseConfirm() { sema.Signal(); }
size_t GpuBufferMgr::Size(const std::string &channel_name) {
if (!name_queue_map_.count(channel_name)) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return 0;
}
return name_queue_map_.at(channel_name)->Size();
}
size_t GpuBufferMgr::Capacity(const std::string &channel_name) {
if (!name_queue_map_.count(channel_name)) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return 0;
}
return name_queue_map_.at(channel_name)->Capacity();
}
} // namespace device
} // namespace mindspore

View File

@ -1,5 +1,5 @@
/**
* Copyright 2019 Huawei Technologies Co., Ltd
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -13,26 +13,77 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "plugin/device/gpu/hal/device/blocking_queue.h"
#include <chrono>
#include "plugin/device/gpu/hal/device/gpu_data_queue.h"
#include <string>
#include "plugin/device/gpu/hal/device/queue_common.h"
#include "utils/ms_utils.h"
#include "utils/ms_context.h"
namespace mindspore {
namespace device {
const size_t kTimeout = 100;
GpuDataQueueDynamic::GpuDataQueueDynamic(const size_t capacity) : DataQueue(capacity), stream_(0), node_info_(nullptr) {
CHECK_CUDA_RET_WITH_ERROR(cudaStreamCreate(&stream_), "Cuda Create Stream Failed");
node_info_ = std::make_unique<NodeInfo[]>(capacity);
auto ms_context = MsContext::GetInstance();
MS_EXCEPTION_IF_NULL(ms_context);
const std::string &device_target = ms_context->get_param<std::string>(MS_CTX_DEVICE_TARGET);
uint32_t device_id = ms_context->get_param<uint32_t>(MS_CTX_DEVICE_ID);
device_context_ = DeviceContextManager::GetInstance().GetOrCreateDeviceContext({device_target, device_id});
device_context_->Initialize();
}
BlockQueueStatus_T GpuDataQueueDynamic::Push(std::vector<DataQueueItem> data) {
for (size_t i = 0; i < data.size(); i++) {
auto &item = data[i];
if (item.data_ptr_ == nullptr) {
MS_LOG(ERROR) << "Invalid Input: ptr: " << item.data_ptr_ << ", len: " << item.data_len_;
return ERROR_INPUT;
}
void *addr = device_context_->AllocateMemory(item.data_len_);
CHECK_CUDA_RET_WITH_ERROR(cudaMemcpyAsync(addr, item.data_ptr_, item.data_len_, cudaMemcpyHostToDevice, stream_),
"Cuda Memcpy Error");
item.device_addr_ = addr;
}
node_info_[tail_].event_.reset(new cudaEvent_t());
CHECK_CUDA_RET_WITH_ERROR(cudaEventCreate(&(*(node_info_[tail_].event_))), "Cuda Create Event Failed");
CHECK_CUDA_RET_WITH_ERROR(cudaEventRecord(*(node_info_[tail_].event_), stream_), "Cuda Create Event Failed");
node_info_[tail_].data_ = data;
tail_ = (tail_ + 1) % (capacity_);
++size_;
return SUCCESS;
}
BlockQueueStatus_T GpuDataQueueDynamic::Front(std::vector<DataQueueItem> *data) const {
CHECK_CUDA_RET_WITH_ERROR(cudaEventSynchronize(*(node_info_[head_].event_)), "Cuda Event Syn Failed");
CHECK_CUDA_RET_WITH_ERROR(cudaEventDestroy(*(node_info_[head_].event_)), "Cuda Destroy Event Failed");
for (auto &item : node_info_[head_].data_) {
host_release_(item.data_ptr_, item.worker_id_);
}
*data = node_info_[head_].data_;
return SUCCESS;
}
BlockQueueStatus_T GpuDataQueueDynamic::Pop() {
head_ = (head_ + 1) % (capacity_);
--size_;
return SUCCESS;
}
bool GpuDataQueueDynamic::Destroy() {
if (stream_ != nullptr) {
auto ret = cudaStreamDestroy(stream_);
if (ret == cudaSuccess) {
stream_ = nullptr;
return true;
} else {
return false;
}
} else {
return true;
}
}
GpuQueue::GpuQueue(void *addr, const std::vector<size_t> &shape, const size_t &capacity)
: buffer_(addr),
head_(0),
tail_(0),
shape_(shape),
len_(0),
size_(0),
capacity_(capacity),
stream_(0),
node_info_(nullptr) {
: DataQueue(capacity), buffer_(addr), shape_(shape), len_(0), stream_(0), node_info_(nullptr) {
CHECK_CUDA_RET_WITH_ERROR(cudaStreamCreate(&stream_), "Cuda Create Stream Failed");
node_info_ = std::make_unique<NodeInfo[]>(capacity);
for (auto item : shape) {
@ -42,7 +93,7 @@ GpuQueue::GpuQueue(void *addr, const std::vector<size_t> &shape, const size_t &c
GpuQueue::~GpuQueue() { buffer_ = nullptr; }
BlockQueueStatus_T GpuQueue::Push(std::vector<DataItemGpu> data) {
BlockQueueStatus_T GpuQueue::Push(std::vector<DataQueueItem> data) {
void *addr = reinterpret_cast<uint8_t *>(buffer_) + tail_ * len_;
for (size_t i = 0; i < data.size(); i++) {
auto &item = data[i];
@ -66,7 +117,7 @@ BlockQueueStatus_T GpuQueue::Push(std::vector<DataItemGpu> data) {
return SUCCESS;
}
BlockQueueStatus_T GpuQueue::Front(std::vector<DataItemGpu> *data) const {
BlockQueueStatus_T GpuQueue::Front(std::vector<DataQueueItem> *data) const {
CHECK_CUDA_RET_WITH_ERROR(cudaEventSynchronize(*(node_info_[head_].event_)), "Cuda Event Syn Failed");
CHECK_CUDA_RET_WITH_ERROR(cudaEventDestroy(*(node_info_[head_].event_)), "Cuda Destroy Event Failed");
for (auto &item : node_info_[head_].data_) {
@ -94,76 +145,5 @@ bool GpuQueue::Destroy() {
return true;
}
}
BlockQueueStatus_T BlockingQueue::Create(void *addr, const std::vector<size_t> &shape, const size_t &capacity) {
if (addr == nullptr) {
MS_LOG(ERROR) << "addr is nullptr";
return INTERNAL_ERROR;
}
queue_ = std::make_shared<GpuQueue>(addr, shape, capacity);
return SUCCESS;
}
void BlockingQueue::RegisterRelease(const std::function<void(void *, int32_t)> &func) { queue_->RegisterRelease(func); }
BlockQueueStatus_T BlockingQueue::Push(const std::vector<DataItemGpu> &data, unsigned int) {
std::unique_lock<std::mutex> locker(mutex_);
if (queue_->IsFull()) {
if (not_full_cond_.wait_for(locker, std::chrono::microseconds(kTimeout)) == std::cv_status::timeout) {
return TIMEOUT;
}
}
auto ret = queue_->Push(data);
if (ret) {
return ret;
}
not_empty_cond_.notify_one();
return SUCCESS;
}
BlockQueueStatus_T BlockingQueue::Front(std::vector<DataItemGpu> *data) {
std::unique_lock<std::mutex> locker(mutex_);
bool timeout = not_empty_cond_.wait_for(locker, std::chrono::seconds(30), [this] { return !queue_->IsEmpty(); });
if (!timeout) {
return TIMEOUT;
}
return queue_->Front(data);
}
BlockQueueStatus_T BlockingQueue::Pop() {
std::unique_lock<std::mutex> locker(mutex_);
not_empty_cond_.wait(locker, [this] { return !queue_->IsEmpty(); });
auto ret = queue_->Pop();
if (ret) {
return ret;
}
not_full_cond_.notify_one();
return SUCCESS;
}
BlockQueueStatus_T BlockingQueue::Clear() {
std::unique_lock<std::mutex> locker(mutex_);
while (Size() > 0) {
std::vector<DataItemGpu> data;
auto ret = queue_->Front(&data);
if (ret) {
return ret;
}
ret = queue_->Pop();
if (ret) {
return ret;
}
}
return SUCCESS;
}
bool BlockingQueue::Destroy() {
if (queue_ != nullptr) {
return queue_->Destroy();
} else {
return true;
}
}
} // namespace device
} // namespace mindspore

View File

@ -0,0 +1,78 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_RUNTIME_DEVICE_GPU_DATA_QUEUE_H_
#define MINDSPORE_CCSRC_RUNTIME_DEVICE_GPU_DATA_QUEUE_H_
#include <cuda_runtime_api.h>
#include <unistd.h>
#include <memory>
#include <vector>
#include <functional>
#include "runtime/data_queue/data_queue.h"
#include "runtime/hardware/device_context_manager.h"
namespace mindspore {
namespace device {
class GpuDataQueueDynamic : public DataQueue {
public:
explicit GpuDataQueueDynamic(const size_t capacity);
virtual ~GpuDataQueueDynamic() = default;
BlockQueueStatus_T Push(std::vector<DataQueueItem> data);
BlockQueueStatus_T Front(std::vector<DataQueueItem> *data) const;
BlockQueueStatus_T Pop();
bool Destroy();
private:
struct NodeInfo {
std::unique_ptr<cudaEvent_t> event_;
std::vector<DataQueueItem> data_;
};
std::vector<size_t> shape_;
cudaStream_t stream_;
std::unique_ptr<NodeInfo[]> node_info_;
};
class GpuQueue : public DataQueue {
public:
GpuQueue(void *addr, const std::vector<size_t> &shape, const size_t &capacity);
virtual ~GpuQueue();
BlockQueueStatus_T Push(std::vector<DataQueueItem> data);
BlockQueueStatus_T Front(std::vector<DataQueueItem> *data) const;
BlockQueueStatus_T Pop();
bool Destroy();
private:
struct NodeInfo {
std::unique_ptr<cudaEvent_t> event_;
std::vector<DataQueueItem> data_;
};
void *buffer_;
std::vector<size_t> shape_;
size_t len_;
cudaStream_t stream_;
std::unique_ptr<NodeInfo[]> node_info_;
};
} // namespace device
} // namespace mindspore
#endif // MINDSPORE_CCSRC_RUNTIME_DEVICE_GPU_BLOCKING_QUEUE_H_

View File

@ -18,7 +18,6 @@
#include "plugin/device/gpu/hal/device/gpu_common.h"
#include "utils/log_adapter.h"
#include "include/common/utils/convert_utils.h"
#include "plugin/device/gpu/hal/device/gpu_buffer_mgr.h"
namespace mindspore {
namespace device {
@ -94,7 +93,6 @@ bool GPUDeviceManager::set_cur_device_id(uint32_t device_id) {
if (!dev_id_init_) {
dev_id_init_ = true;
cur_dev_id_ = device_id;
mindspore::device::GpuBufferMgr::GetInstance().set_device_id(UintToInt(device_id));
return true;
} else {
MS_LOG(ERROR) << "Device already been set.";

View File

@ -20,7 +20,7 @@
#include "plugin/device/gpu/hal/device/gpu_device_address.h"
#include "plugin/device/gpu/hal/device/cuda_driver.h"
#include "plugin/device/gpu/hal/device/gpu_event.h"
#include "plugin/device/gpu/hal/device/gpu_buffer_mgr.h"
#include "runtime/data_queue/data_queue_mgr.h"
#include "plugin/device/gpu/hal/device/gpu_device_manager.h"
#include "plugin/device/gpu/hal/device/gpu_memory_allocator.h"
#include "plugin/device/gpu/hal/device/distribution/collective_init.h"
@ -281,13 +281,13 @@ void GPUKernelRuntime::ReleaseDeviceRes() {
}
}
#endif
if (GpuBufferMgr::GetInstance().IsInit()) {
if (!GpuBufferMgr::GetInstance().IsClosed()) {
if (!GpuBufferMgr::GetInstance().CloseNotify()) {
if (DataQueueMgr::GetInstance().IsInit()) {
if (!DataQueueMgr::GetInstance().IsClosed()) {
if (!DataQueueMgr::GetInstance().CloseNotify()) {
MS_LOG(ERROR) << "Could not close gpu data queue.";
}
}
CHECK_OP_RET_WITH_ERROR(GpuBufferMgr::GetInstance().Destroy(), "Could not destroy gpu data queue.");
CHECK_OP_RET_WITH_ERROR(DataQueueMgr::GetInstance().Destroy(), "Could not destroy gpu data queue.");
}
// Destroy remaining memory swap events and free host memory.

View File

@ -25,7 +25,7 @@
#include "plugin/device/gpu/hal/device/gpu_stream_assign.h"
#include "plugin/device/gpu/hal/device/distribution/collective_init.h"
#include "plugin/device/gpu/hal/device/gpu_device_manager.h"
#include "plugin/device/gpu/hal/device/gpu_buffer_mgr.h"
#include "runtime/data_queue/data_queue_mgr.h"
#include "kernel/common_utils.h"
#include "plugin/device/gpu/hal/device/gpu_common.h"
#include "plugin/device/gpu/kernel/cuda_impl/cuda_ops/cuda_common.h"
@ -167,11 +167,11 @@ void GPUDeviceContext::Destroy() {
}
#endif
if (GpuBufferMgr::GetInstance().IsInit()) {
if (!GpuBufferMgr::GetInstance().IsClosed() && !GpuBufferMgr::GetInstance().CloseNotify()) {
if (DataQueueMgr::GetInstance().IsInit()) {
if (!DataQueueMgr::GetInstance().IsClosed() && !DataQueueMgr::GetInstance().CloseNotify()) {
MS_LOG(ERROR) << "Could not close gpu data queue.";
}
CHECK_OP_RET_WITH_ERROR(GpuBufferMgr::GetInstance().Destroy(), "Could not destroy gpu data queue.");
CHECK_OP_RET_WITH_ERROR(DataQueueMgr::GetInstance().Destroy(), "Could not destroy gpu data queue.");
}
// Release stream, cudnn and cublas handle, etc.

View File

@ -17,13 +17,13 @@
#include "plugin/device/gpu/kernel/data/dataset_init_kernel.h"
#include "plugin/device/gpu/kernel/data/dataset_utils.h"
#include "kernel/common_utils.h"
#include "plugin/device/gpu/hal/device/gpu_buffer_mgr.h"
#include "runtime/data_queue/data_queue_mgr.h"
#include "plugin/device/gpu/hal/device/gpu_memory_allocator.h"
#include "include/common/utils/convert_utils.h"
namespace mindspore {
namespace kernel {
using mindspore::device::GpuBufferMgr;
using mindspore::device::DataQueueMgr;
DatasetInitKernelMod::DatasetInitKernelMod() : total_bytes_(0) {}
@ -59,7 +59,7 @@ bool DatasetInitKernelMod::Launch(const std::vector<AddressPtr> &, const std::ve
<< len << "].";
}
auto status = GpuBufferMgr::GetInstance().Create(queue_name_, addr, shapes_, buffer_q_capacity_);
auto status = DataQueueMgr::GetInstance().Create(queue_name_, addr, shapes_, buffer_q_capacity_);
if (status) {
MS_LOG(EXCEPTION) << "For '" << kernel_name_ << "', init Dataset Failed. len: " << len << ", status:" << status;
}

View File

@ -19,6 +19,7 @@
#include <string>
#include <vector>
#include <algorithm>
#include <map>
#include "include/common/utils/convert_utils.h"
#include "plugin/device/gpu/kernel/data/dataset_utils.h"
#include "kernel/common_utils.h"
@ -26,7 +27,7 @@
#ifndef ENABLE_SECURITY
#include "profiler/device/gpu/gpu_profiling.h"
#endif
#include "plugin/device/gpu/hal/device/gpu_buffer_mgr.h"
#include "runtime/data_queue/data_queue_mgr.h"
#include "plugin/device/gpu/hal/device/gpu_common.h"
#ifdef ENABLE_DUMP_IR
#include "include/common/debug/rdr/recorder_manager.h"
@ -34,14 +35,15 @@
namespace mindspore {
namespace kernel {
using mindspore::device::GpuBufferMgr;
using mindspore::device::DataQueueMgr;
DatasetIteratorKernelMod::DatasetIteratorKernelMod()
: is_opened_(false), profiling_enable_(false), profiling_op_(nullptr) {}
DatasetIteratorKernelMod::~DatasetIteratorKernelMod() { GpuBufferMgr::GetInstance().Close(queue_name_); }
DatasetIteratorKernelMod::~DatasetIteratorKernelMod() { DataQueueMgr::GetInstance().Close(queue_name_); }
bool DatasetIteratorKernelMod::Init(const CNodePtr &kernel_node) {
dynamic_shape_ = common::AnfAlgo::IsDynamicShape(kernel_node);
MS_EXCEPTION_IF_NULL(kernel_node);
kernel_node_ = kernel_node;
kernel_name_ = common::AnfAlgo::GetCNodeName(kernel_node);
@ -54,12 +56,15 @@ bool DatasetIteratorKernelMod::Init(const CNodePtr &kernel_node) {
}
std::transform(type_ptrs.begin(), type_ptrs.end(), std::back_inserter(types_),
[](const TypePtr &value) { return value->type_id(); });
for (size_t i = 0; i < shapes.size(); i++) {
int unit = UnitSizeInBytes(type_ptrs[i]->type_id());
int nums = ElementNums(shapes[i]);
int bytes = unit * nums;
output_size_list_.push_back(bytes);
if (dynamic_shape_) {
output_size_list_ = std::vector<size_t>(shapes.size(), 0);
} else {
for (size_t i = 0; i < shapes.size(); i++) {
int unit = UnitSizeInBytes(type_ptrs[i]->type_id());
int nums = ElementNums(shapes[i]);
int bytes = unit * nums;
output_size_list_.push_back(bytes);
}
}
is_need_retrieve_output_shape_ = true;
@ -77,9 +82,21 @@ bool DatasetIteratorKernelMod::Init(const CNodePtr &kernel_node) {
return true;
}
int DatasetIteratorKernelMod::Resize(const BaseOperatorPtr &base_operator, const std::vector<KernelTensorPtr> &inputs,
const std::vector<KernelTensorPtr> &outputs,
const std::map<uint32_t, tensor::TensorPtr> &others) {
if (dynamic_shape_) {
auto data_kernel = kernel_node_.lock();
if (!device::PopDataFromDataQueue(data_kernel)) {
return KernelErrorCode::KRET_RESIZE_FAILED;
}
}
return KernelErrorCode::KRET_OK;
}
void DatasetIteratorKernelMod::InitSizeLists() { return; }
bool DatasetIteratorKernelMod::ReadDevice(std::vector<DataItemGpu> *data) {
bool DatasetIteratorKernelMod::ReadDevice(std::vector<DataQueueItem> *data) {
uint64_t start_time_stamp = 0;
uint32_t queue_size = 0;
#ifndef ENABLE_SECURITY
@ -92,10 +109,10 @@ bool DatasetIteratorKernelMod::ReadDevice(std::vector<DataItemGpu> *data) {
profiling_enable_ = profiler_inst->GetEnableFlag();
if (profiling_enable_) {
start_time_stamp = profiling_op_->GetTimeStamp();
queue_size = GpuBufferMgr::GetInstance().Size(queue_name_);
queue_size = DataQueueMgr::GetInstance().Size(queue_name_);
}
#endif
auto ret = GpuBufferMgr::GetInstance().Front(queue_name_, data);
auto ret = DataQueueMgr::GetInstance().Front(queue_name_, data);
if (ret == device::SUCCESS) {
#ifndef ENABLE_SECURITY
if (profiling_enable_) {
@ -133,8 +150,11 @@ bool DatasetIteratorKernelMod::ReadDevice(std::vector<DataItemGpu> *data) {
bool DatasetIteratorKernelMod::Launch(const std::vector<AddressPtr> &, const std::vector<AddressPtr> &,
const std::vector<AddressPtr> &outputs, void *stream) {
if (dynamic_shape_) {
return true;
}
if (!is_opened_) {
auto ret = GpuBufferMgr::GetInstance().Open(queue_name_, output_size_list_);
auto ret = DataQueueMgr::GetInstance().OpenDynamicBufQueue(queue_name_);
if (ret != device::BlockQueueStatus_T::SUCCESS) {
MS_LOG(EXCEPTION) << "For '" << kernel_name_ << "', gpu Queue(" << queue_name_ << ") Open Failed: " << ret;
}
@ -154,14 +174,16 @@ bool DatasetIteratorKernelMod::Launch(const std::vector<AddressPtr> &, const std
reinterpret_cast<cudaStream_t>(stream)),
"Cuda Memcpy Failed");
}
CHECK_CUDA_RET_WITH_EXCEPT(kernel_node_, cudaStreamSynchronize(reinterpret_cast<cudaStream_t>(stream)),
"cudaStreamSynchronize failed");
(void)GpuBufferMgr::GetInstance().Pop(queue_name_);
(void)DataQueueMgr::GetInstance().Pop(queue_name_);
return true;
}
void DatasetIteratorKernelMod::SyncData() {
if (dynamic_shape_) {
return;
}
std::vector<std::vector<size_t>> shapes;
for (const auto &item : output_data_) {
std::vector<size_t> shape;

View File

@ -20,13 +20,14 @@
#include <memory>
#include <string>
#include <vector>
#include <map>
#include "plugin/device/gpu/kernel/data/dataset_profiling.h"
#include "plugin/device/gpu/kernel/gpu_kernel.h"
#include "plugin/device/gpu/kernel/gpu_kernel_factory.h"
#include "plugin/device/gpu/hal/device/blocking_queue.h"
#include "runtime/data_queue/blocking_queue.h"
namespace mindspore {
namespace kernel {
using mindspore::device::DataItemGpu;
using mindspore::device::DataQueueItem;
class DatasetIteratorKernelMod : public DeprecatedNativeGpuKernelMod {
public:
@ -36,19 +37,22 @@ class DatasetIteratorKernelMod : public DeprecatedNativeGpuKernelMod {
bool Launch(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &workspace,
const std::vector<AddressPtr> &outputs, void *stream_ptr) override;
bool Init(const CNodePtr &kernel_node) override;
int Resize(const BaseOperatorPtr &base_operator, const std::vector<KernelTensorPtr> &inputs,
const std::vector<KernelTensorPtr> &outputs, const std::map<uint32_t, tensor::TensorPtr> &others) override;
protected:
void InitSizeLists() override;
void SyncData() override;
private:
bool ReadDevice(std::vector<DataItemGpu> *data);
bool ReadDevice(std::vector<DataQueueItem> *data);
std::string queue_name_;
bool is_opened_;
bool profiling_enable_;
std::shared_ptr<GetNextProfiling> profiling_op_;
std::vector<TypeId> types_;
std::vector<DataItemGpu> output_data_;
std::vector<DataQueueItem> output_data_;
bool dynamic_shape_;
};
MS_REG_GPU_KERNEL(GetNext, DatasetIteratorKernelMod)

View File

@ -0,0 +1,5 @@
file(GLOB_RECURSE BUILDER_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR}
"*.cc")
set_property(SOURCE ${BUILDER_SRC_LIST} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_DEVICE)
add_library(_mindspore_runtime_data_queue_obj OBJECT ${BUILDER_SRC_LIST})

View File

@ -0,0 +1,89 @@
/**
* Copyright 2019 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "runtime/data_queue/blocking_queue.h"
#include "utils/log_adapter.h"
namespace mindspore {
namespace device {
const size_t kTimeout = 100;
void BlockingQueue::RegisterRelease(const std::function<void(void *, int32_t)> &func) { queue_->RegisterRelease(func); }
BlockQueueStatus_T BlockingQueue::Push(const std::vector<DataQueueItem> &data, unsigned int) {
std::unique_lock<std::mutex> locker(mutex_);
if (queue_->IsFull()) {
if (not_full_cond_.wait_for(locker, std::chrono::microseconds(kTimeout)) == std::cv_status::timeout) {
return TIMEOUT;
}
}
auto ret = queue_->Push(data);
if (ret) {
return ret;
}
not_empty_cond_.notify_one();
return SUCCESS;
}
BlockQueueStatus_T BlockingQueue::Front(std::vector<DataQueueItem> *data) {
std::unique_lock<std::mutex> locker(mutex_);
bool timeout = not_empty_cond_.wait_for(locker, std::chrono::seconds(30), [this] { return !queue_->IsEmpty(); });
if (!timeout) {
return TIMEOUT;
}
return queue_->Front(data);
}
BlockQueueStatus_T BlockingQueue::Pop() {
std::unique_lock<std::mutex> locker(mutex_);
not_empty_cond_.wait(locker, [this] { return !queue_->IsEmpty(); });
auto ret = queue_->Pop();
if (ret) {
return ret;
}
not_full_cond_.notify_one();
return SUCCESS;
}
BlockQueueStatus_T BlockingQueue::Create(const std::shared_ptr<DataQueue> &data_queue) {
this->queue_ = data_queue;
return SUCCESS;
}
BlockQueueStatus_T BlockingQueue::Clear() {
std::unique_lock<std::mutex> locker(mutex_);
while (Size() > 0) {
std::vector<DataQueueItem> data;
auto ret = queue_->Front(&data);
if (ret) {
return ret;
}
ret = queue_->Pop();
if (ret) {
return ret;
}
}
return SUCCESS;
}
bool BlockingQueue::Destroy() {
if (queue_ != nullptr) {
return queue_->Destroy();
} else {
return true;
}
}
} // namespace device
} // namespace mindspore

View File

@ -0,0 +1,56 @@
/**
* Copyright 2019 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_RUNTIME_DEVICE_GPU_BLOCKING_QUEUE_H_
#define MINDSPORE_CCSRC_RUNTIME_DEVICE_GPU_BLOCKING_QUEUE_H_
#include <unistd.h>
#include <iostream>
#include <memory>
#include <mutex>
#include <cstring>
#include <string>
#include <vector>
#include <condition_variable>
#include <functional>
#include "runtime/data_queue/data_queue.h"
namespace mindspore {
namespace device {
class BlockingQueue {
public:
BlockingQueue() : queue_(nullptr) {}
~BlockingQueue() = default;
BlockQueueStatus_T Create(const std::shared_ptr<DataQueue> &data_queue);
void RegisterRelease(const std::function<void(void *, int32_t)> &func);
BlockQueueStatus_T Push(const std::vector<DataQueueItem> &data, unsigned int timeout_in_sec);
BlockQueueStatus_T Front(std::vector<DataQueueItem> *data);
BlockQueueStatus_T Pop();
BlockQueueStatus_T Clear();
bool Destroy();
size_t Size() { return queue_->Size(); }
size_t Capacity() { return queue_->Capacity(); }
private:
std::mutex mutex_;
std::condition_variable not_full_cond_;
std::condition_variable not_empty_cond_;
std::shared_ptr<DataQueue> queue_;
};
} // namespace device
} // namespace mindspore
#endif // MINDSPORE_CCSRC_RUNTIME_DEVICE_GPU_BLOCKING_QUEUE_H_

View File

@ -0,0 +1,31 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "runtime/data_queue/data_queue.h"
#include <string>
#include "utils/ms_context.h"
namespace mindspore {
namespace device {
DataQueue::DataQueue(const size_t capacity) : head_(0), tail_(0), size_(0), capacity_(capacity) {
auto ms_context = MsContext::GetInstance();
MS_EXCEPTION_IF_NULL(ms_context);
const std::string &device_target = ms_context->get_param<std::string>(MS_CTX_DEVICE_TARGET);
uint32_t device_id = ms_context->get_param<uint32_t>(MS_CTX_DEVICE_ID);
device_context_ = DeviceContextManager::GetInstance().GetOrCreateDeviceContext({device_target, device_id});
device_context_->Initialize();
}
} // namespace device
} // namespace mindspore

View File

@ -0,0 +1,63 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_RUNTIME_DEVICE_DATA_QUEUE_H_
#define MINDSPORE_CCSRC_RUNTIME_DEVICE_DATA_QUEUE_H_
#include <unistd.h>
#include <string>
#include <memory>
#include <vector>
#include <functional>
#include "runtime/hardware/device_context_manager.h"
#include "mindspore/core/utils/data_queue_handler.h"
namespace mindspore {
namespace device {
class DataQueue {
public:
explicit DataQueue(const size_t capacity);
virtual ~DataQueue() = default;
virtual void RegisterRelease(const std::function<void(void *, int32_t)> &func) { host_release_ = func; }
virtual inline bool IsEmpty() const { return size_ == 0; }
virtual inline bool IsFull() const { return size_ == capacity_; }
virtual BlockQueueStatus_T Push(std::vector<DataQueueItem> data) = 0;
virtual BlockQueueStatus_T Front(std::vector<DataQueueItem> *data) const = 0;
virtual BlockQueueStatus_T Pop() = 0;
virtual bool Destroy() = 0;
virtual size_t Size() { return size_; }
virtual size_t Capacity() { return capacity_; }
protected:
size_t head_;
size_t tail_;
size_t size_;
size_t capacity_;
std::function<void(void *, int32_t)> host_release_;
DeviceContext *device_context_;
private:
DataQueue(const DataQueue &) = delete;
DataQueue &operator=(const DataQueue &) = delete;
};
} // namespace device
} // namespace mindspore
#endif // MINDSPORE_CCSRC_RUNTIME_DEVICE_BLOCKING_QUEUE_H_

View File

@ -0,0 +1,304 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "runtime/data_queue/data_queue_mgr.h"
#if ENABLE_GPU
#include "plugin/device/gpu/hal/device/gpu_data_queue.h"
#elif ENABLE_D
#include "plugin/device/ascend/hal/device/ascend_data_queue.h"
#endif
#include <algorithm>
#include <utility>
#include "utils/log_adapter.h"
#include "utils/ms_utils.h"
#include "pybind11/pybind11.h"
#include "pybind11/stl.h"
namespace py = pybind11;
namespace mindspore {
namespace device {
DataQueueMgr &DataQueueMgr::GetInstance() noexcept {
static DataQueueMgr instance;
return instance;
}
BlockQueueStatus_T DataQueueMgr::Create(const std::string &channel_name, void *addr, const std::vector<size_t> &shape,
const size_t &capacity) {
MS_LOG(INFO) << "Gpu queue: " << channel_name << " created.";
if (name_queue_map_.count(channel_name)) {
MS_LOG(ERROR) << "Queue already exist: " << channel_name;
return QUEUE_EXIST;
}
#if ENABLE_GPU
std::shared_ptr<BlockingQueue> queue = std::make_shared<BlockingQueue>();
std::shared_ptr<DataQueue> gpu_queue = std::make_shared<GpuQueue>(addr, shape, capacity);
BlockQueueStatus_T rt = queue->Create(gpu_queue);
if (rt != SUCCESS) {
MS_LOG(ERROR) << "Queue: " << channel_name << "create failed: " << rt;
return rt;
}
(void)name_queue_map_.insert(std::make_pair(channel_name, queue));
init_ = true;
return SUCCESS;
#else
MS_LOG(ERROR) << "Static data queue only support GPU target.";
return INTERNAL_ERROR;
#endif
}
BlockQueueStatus_T DataQueueMgr::Open(const std::string &channel_name,
const std::function<void(void *, int32_t)> func) {
MS_LOG(INFO) << "Gpu queue: " << channel_name << " open.";
if (!name_queue_map_.count(channel_name)) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
name_queue_map_[channel_name]->RegisterRelease(func);
open_by_dataset_++;
return SUCCESS;
}
BlockQueueStatus_T DataQueueMgr::OpenDynamicBufQueue(const std::string &channel_name,
const std::function<void(void *, int32_t)> func) {
std::unique_lock<std::mutex> locker(mutex_);
if (!name_queue_map_.count(channel_name)) {
BlockQueueStatus_T status = CreateDynamicBufQueue(channel_name, default_capacity_);
MS_EXCEPTION_IF_CHECK_FAIL(status == BlockQueueStatus_T::SUCCESS, "Create dynamic buffer queue failed");
MS_LOG_INFO << "Create dynamic buffer queue: " << channel_name;
cv_.notify_all();
}
name_queue_map_[channel_name]->RegisterRelease(func);
open_by_dataset_++;
return SUCCESS;
}
BlockQueueStatus_T DataQueueMgr::OpenDynamicBufQueue(const std::string &channel_name) {
std::unique_lock<std::mutex> locker(mutex_);
auto time_out = cv_.wait_for(locker, std::chrono::seconds(MAX_WAIT_TIME_IN_SEC),
[this, &channel_name] { return name_queue_map_.count(channel_name); });
if (!time_out) {
return TIMEOUT;
}
return SUCCESS;
}
BlockQueueStatus_T DataQueueMgr::CreateDynamicBufQueue(const std::string &channel_name, const size_t &capacity) {
if (name_queue_map_.count(channel_name)) {
MS_LOG(ERROR) << "Queue already exist: " << channel_name;
return QUEUE_EXIST;
}
#if ENABLE_GPU
std::shared_ptr<BlockingQueue> queue = std::make_shared<BlockingQueue>();
std::shared_ptr<DataQueue> device_queue = std::make_shared<GpuDataQueueDynamic>(capacity);
#elif ENABLE_D
std::shared_ptr<BlockingQueue> queue = std::make_shared<BlockingQueue>();
std::shared_ptr<DataQueue> device_queue = std::make_shared<AscendDataQueueDynamic>(capacity);
#else
MS_LOG(ERROR) << "Dynamic data queue only support Ascend/GPU target.";
return QUEUE_EXIST;
#endif
#if ENABLE_GPU || ENABLE_D
BlockQueueStatus_T rt = queue->Create(device_queue);
if (rt != SUCCESS) {
MS_LOG(ERROR) << "Queue: " << channel_name << "create failed: " << rt;
return rt;
}
(void)name_queue_map_.insert(std::make_pair(channel_name, queue));
init_ = true;
return SUCCESS;
#endif
}
BlockQueueStatus_T DataQueueMgr::Open(const std::string &channel_name) {
if (!name_queue_map_.count(channel_name)) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
return SUCCESS;
}
BlockQueueStatus_T DataQueueMgr::Push(const std::string &channel_name, const std::vector<DataQueueItem> &data,
unsigned int timeout_in_sec) {
auto iter = name_queue_map_.find(channel_name);
if (iter == name_queue_map_.end()) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
return iter->second->Push(data, timeout_in_sec);
}
BlockQueueStatus_T DataQueueMgr::Front(const std::string &channel_name, std::vector<DataQueueItem> *data) {
auto iter = name_queue_map_.find(channel_name);
if (iter == name_queue_map_.end()) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
return iter->second->Front(data);
}
BlockQueueStatus_T DataQueueMgr::Pop(const std::string &channel_name) {
auto iter = name_queue_map_.find(channel_name);
if (iter == name_queue_map_.end()) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
return iter->second->Pop();
}
BlockQueueStatus_T DataQueueMgr::Clear(const std::string &channel_name) {
auto iter = name_queue_map_.find(channel_name);
if (iter == name_queue_map_.end()) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
return iter->second->Clear();
}
void DataQueueMgr::Close(const std::string &channel_name) noexcept {
MS_LOG(INFO) << "Close the queue: " << channel_name;
return;
}
bool DataQueueMgr::IsInit() const { return init_; }
bool DataQueueMgr::IsClosed() const { return closed_; }
bool DataQueueMgr::Destroy() {
MS_LOG(INFO) << "Destroy all data queue.";
for (auto iter = name_queue_map_.begin(); iter != name_queue_map_.end(); ++iter) {
std::shared_ptr<BlockingQueue> queue = iter->second;
if (queue != nullptr) {
if (!queue->Destroy()) {
return false;
}
queue.reset();
}
}
name_queue_map_.clear();
return true;
}
inline bool DataQueueMgr::isCreated(const std::string &channel_name) {
if (name_queue_map_.count(channel_name) != 0) {
return true;
}
return false;
}
bool DataQueueMgr::CloseNotify() {
py::gil_scoped_release release;
bool result = true;
// lock scope
{
std::lock_guard<std::mutex> lk(close_mutex_);
// set closed_ to be true, all the dataset retry can be jumped out of the while
closed_ = true;
}
// wati for the dataset threads' ack
for (int i = 0; i < open_by_dataset_; i++) {
if (sema.Wait() == false) {
MS_LOG(ERROR) << "time out of receiving signals";
result = false;
}
MS_LOG(DEBUG) << "receive one signal (" << (i + 1) << "/" << open_by_dataset_ << ")";
}
return result;
}
void DataQueueMgr::CloseConfirm() { sema.Signal(); }
size_t DataQueueMgr::Size(const std::string &channel_name) {
if (!name_queue_map_.count(channel_name)) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return 0;
}
return name_queue_map_.at(channel_name)->Size();
}
size_t DataQueueMgr::Capacity(const std::string &channel_name) {
if (!name_queue_map_.count(channel_name)) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return 0;
}
return name_queue_map_.at(channel_name)->Capacity();
}
bool PopDataFromDataQueue(const AnfNodePtr &data_kernel) {
auto queue_name = common::AnfAlgo::GetNodeAttr<std::string>(data_kernel, "shared_name");
device::DataQueueMgr &buf_mgr = device::DataQueueMgr::GetInstance();
auto ret = buf_mgr.OpenDynamicBufQueue(queue_name);
MS_EXCEPTION_IF_CHECK_FAIL(ret == device::BlockQueueStatus_T::SUCCESS, "Open dynamic data queue failed");
std::vector<device::DataQueueItem> data;
auto kernel_info = dynamic_cast<device::KernelInfo *>(data_kernel->kernel_info());
buf_mgr.Front(queue_name, &data);
buf_mgr.Pop(queue_name);
std::vector<std::shared_ptr<device::DeviceAddress>> device_tensors;
for (auto &device_tensor : kernel_info->output_address_list()) {
MS_EXCEPTION_IF_NULL(device_tensor);
device_tensors.push_back(device_tensor);
}
MS_EXCEPTION_IF_CHECK_FAIL(data.size() == device_tensors.size(),
"The number of data tensor popped from dynamic queue is not correct");
std::vector<std::vector<size_t>> shapes;
std::vector<TypeId> types;
std::vector<size_t> output_size_list;
for (size_t i = 0; i < data.size(); ++i) {
device_tensors[i]->set_ptr(data[i].device_addr_);
device_tensors[i]->SetSize(data[i].data_len_);
device_tensors[i]->set_from_mem_pool(true);
output_size_list.push_back(data[i].data_len_);
std::vector<size_t> shape;
std::transform(data[i].shapes_.begin(), data[i].shapes_.end(), std::back_inserter(shape), LongToSize);
kernel_info->SetOutputAddr(device_tensors[i], i);
shapes.push_back(shape);
types.push_back(common::AnfAlgo::GetOutputInferDataType(data_kernel, i));
}
auto kernel_mod = kernel_info->MutableKernelMod();
kernel_mod->SetOutputSizeList(output_size_list);
common::AnfAlgo::SetOutputInferTypeAndShape(types, shapes, data_kernel.get());
return true;
}
namespace {
struct DataQueueHandlerRegister {
DataQueueHandlerRegister() noexcept {
DataQueueHandler::SetOpenHandler(
[](const std::string channel_name, const std::function<void(void *, int32_t)> func) -> BlockQueueStatus_T {
return DataQueueMgr::GetInstance().Open(channel_name, func);
});
DataQueueHandler::SetOpenDynamicBufQueueHandler(
[](const std::string &channel_name, const std::function<void(void *, int32_t)> func) -> BlockQueueStatus_T {
return DataQueueMgr::GetInstance().OpenDynamicBufQueue(channel_name, func);
});
DataQueueHandler::SetIsClosedHandler([]() -> bool { return DataQueueMgr::GetInstance().IsClosed(); });
DataQueueHandler::SetCloseConfirmHandler([]() { DataQueueMgr::GetInstance().CloseConfirm(); });
DataQueueHandler::SetCloseHandler([](const std::string &channel) { DataQueueMgr::GetInstance().Close(channel); });
DataQueueHandler::SetClearHandler([](const std::string &channel) -> device::BlockQueueStatus_T {
return DataQueueMgr::GetInstance().Clear(channel);
});
DataQueueHandler::SetPushHandler([](const std::string &channel, const std::vector<device::DataQueueItem> &items,
unsigned int timeout) -> device::BlockQueueStatus_T {
return DataQueueMgr::GetInstance().Push(channel, items, timeout);
});
}
} callback_register;
} // namespace
} // namespace device
} // namespace mindspore

View File

@ -1,5 +1,5 @@
/**
* Copyright 2019 Huawei Technologies Co., Ltd
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,8 +14,8 @@
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_RUNTIME_DEVICE_GPU_GPU_BUFFER_MGR_H_
#define MINDSPORE_CCSRC_RUNTIME_DEVICE_GPU_GPU_BUFFER_MGR_H_
#ifndef MINDSPORE_CCSRC_RUNTIME_DEVICE_DATA_QUEUE_MGR_H_
#define MINDSPORE_CCSRC_RUNTIME_DEVICE_DATA_QUEUE_MGR_H_
#include <unistd.h>
#include <cstring>
@ -25,7 +25,7 @@
#include <vector>
#include <string>
#include <memory>
#include "plugin/device/gpu/hal/device/blocking_queue.h"
#include "runtime/data_queue/blocking_queue.h"
#define EXPORT __attribute__((visibility("default")))
@ -60,31 +60,32 @@ class Semaphore {
int count_;
};
class GpuBufferMgr {
class DataQueueMgr {
public:
EXPORT GpuBufferMgr() : cur_dev_id_(0), init_(false), closed_(false), open_by_dataset_(0) {}
EXPORT DataQueueMgr() : cur_dev_id_(0), init_(false), closed_(false), open_by_dataset_(0) {}
EXPORT virtual ~GpuBufferMgr() = default;
EXPORT virtual ~DataQueueMgr() = default;
EXPORT static GpuBufferMgr &GetInstance() noexcept;
EXPORT static DataQueueMgr &GetInstance() noexcept;
EXPORT BlockQueueStatus_T Create(const std::string &channel_name, void *addr, const std::vector<size_t> &shape,
const size_t &capacity);
// call for Push thread
EXPORT BlockQueueStatus_T Open(const std::string &channel_name, const std::vector<size_t> &shape,
std::function<void(void *, int32_t)> func);
EXPORT BlockQueueStatus_T Open(const std::string &channel_name, std::function<void(void *, int32_t)> func);
// call for Front/Pop thread
EXPORT BlockQueueStatus_T Open(const std::string &channel_name, const std::vector<size_t> &shape);
EXPORT BlockQueueStatus_T Push(const std::string &channel_name, const std::vector<DataItemGpu> &data,
EXPORT BlockQueueStatus_T Open(const std::string &channel_name);
EXPORT BlockQueueStatus_T Push(const std::string &channel_name, const std::vector<DataQueueItem> &data,
unsigned int timeout_in_sec);
EXPORT BlockQueueStatus_T Front(const std::string &channel_name, std::vector<DataItemGpu> *data);
EXPORT BlockQueueStatus_T Front(const std::string &channel_name, std::vector<DataQueueItem> *data);
EXPORT BlockQueueStatus_T Pop(const std::string &channel_name);
EXPORT BlockQueueStatus_T Clear(const std::string &channel_name);
EXPORT void set_device_id(int device_id);
EXPORT BlockQueueStatus_T OpenDynamicBufQueue(const std::string &channel_name,
const std::function<void(void *, int32_t)> func);
EXPORT BlockQueueStatus_T CreateDynamicBufQueue(const std::string &channel_name, const size_t &capacity);
EXPORT BlockQueueStatus_T OpenDynamicBufQueue(const std::string &channel_name);
EXPORT void Close(const std::string &channel_name) noexcept;
@ -105,24 +106,26 @@ class GpuBufferMgr {
EXPORT size_t Capacity(const std::string &channel_name);
private:
void set_device() const;
int cur_dev_id_;
bool init_;
bool closed_;
std::mutex mutex_;
std::mutex close_mutex_;
std::condition_variable cv_;
// how many queues opened by dataset
int open_by_dataset_;
Semaphore sema;
bool dynamic_shape_{false};
size_t default_capacity_{2};
std::map<std::string, std::shared_ptr<BlockingQueue>> name_queue_map_;
inline bool isCreated(const std::string &channel_name);
GpuBufferMgr(const GpuBufferMgr &) = delete;
GpuBufferMgr &operator=(const GpuBufferMgr &) = delete;
DataQueueMgr(const DataQueueMgr &) = delete;
DataQueueMgr &operator=(const DataQueueMgr &) = delete;
};
bool PopDataFromDataQueue(const AnfNodePtr &data_kernel);
} // namespace device
} // namespace mindspore

View File

@ -160,8 +160,6 @@ void DeviceQueueDataSourceActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *co
}
if (common::AnfAlgo::IsDynamicShape(data_kernel_)) {
kernel::UpdateNodeShape(data_kernel_);
AnfAlgo::UpdateOutputAddrSize(kernel_info_, data_kernel_);
AnfAlgo::UpdateInternalParameterShape(internal_parameters_, data_kernel_);
}
PostRun(context);

View File

@ -167,6 +167,9 @@ class DeviceContext {
// Ensure the thread safety for creating stream.
std::mutex stream_mutex_;
// Ensure the thread safety for allocating device memory.
mutable std::mutex alloc_mem_mutex_;
// The collective communication library.
CollectiveCommunicationLib *collective_comm_lib_;
};

View File

@ -71,7 +71,7 @@ class RuntimeCache {
std::map<size_t, std::pair<AnfNodePtr, size_t>> prev_node_output_map_;
std::string device_target_;
ssize_t output_tensor_num_ = -1;
CacheBool is_real_kernel_ = CacheBool::UNCACHED;
CacheBool is_real_kernel_ = Uncached;
};
// Interface for device kernel program information.
class KernelInfoDevice {

View File

@ -223,8 +223,8 @@ bool AnfUtils::IsRealKernel(const AnfNodePtr &node) {
auto kernel_info = cnode->kernel_info();
if (kernel_info) {
auto runtime_cache = kernel_info->runtime_cache();
if (runtime_cache.runtime_cache().is_real_kernel() != CacheBool::UNCACHED) {
return (runtime_cache.runtime_cache().is_real_kernel() == CacheBool::TRUE);
if (runtime_cache.runtime_cache().is_real_kernel() != Uncached) {
return (runtime_cache.runtime_cache().is_real_kernel() == True);
}
}
bool res = !IsOneOfPrimitive(cnode->input(kAnfPrimitiveIndex), virtual_prims);
@ -232,9 +232,9 @@ bool AnfUtils::IsRealKernel(const AnfNodePtr &node) {
if (kernel_info) {
auto runtime_cache = kernel_info->runtime_cache();
if (res) {
runtime_cache.runtime_cache().set_real_kernel(CacheBool::TRUE);
runtime_cache.runtime_cache().set_real_kernel(True);
} else {
runtime_cache.runtime_cache().set_real_kernel(CacheBool::FALSE);
runtime_cache.runtime_cache().set_real_kernel(False);
}
}

View File

@ -0,0 +1,36 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CORE_UTILS_CALLBACK_HANDLER_H_
#define MINDSPORE_CORE_UTILS_CALLBACK_HANDLER_H_
#include <utility>
#define HANDLER_DEFINE(return_type, name, args...) \
public: \
template <typename... Args> \
static return_type name(const Args &... argss) { \
if (name##_handler_ == nullptr) { \
return return_type(); \
} \
return name##_handler_(argss...); \
} \
using name##Handler = std::function<decltype(name<args>)>; \
static void Set##name##Handler(name##Handler handler) { name##_handler_ = std::move(handler); } \
\
private: \
inline static name##Handler name##_handler_;
#endif // MINDSPORE_CORE_UTILS_CALLBACK_HANDLER_H_

View File

@ -0,0 +1,20 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "utils/data_queue_handler.h"
namespace {
// empty source file trick to avoid symbol ex/import problem on Windows
} // namespace

View File

@ -0,0 +1,50 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_INCLUDE_COMMON_UTILS_DATA_QUEUE_HANDLER_H_
#define MINDSPORE_CCSRC_INCLUDE_COMMON_UTILS_DATA_QUEUE_HANDLER_H_
#include <string>
#include <utility>
#include <functional>
#include <vector>
#include "utils/visible.h"
#include "utils/callback_handler.h"
namespace mindspore {
namespace device {
enum BlockQueueStatus_T : int { SUCCESS = 0, QUEUE_EXIST, QUEUE_NOT_EXIST, ERROR_INPUT, INTERNAL_ERROR, TIMEOUT };
struct DataQueueItem {
int32_t worker_id_{0};
std::string data_type_;
size_t data_len_{0};
void *data_ptr_{nullptr};
std::vector<int64_t> shapes_;
void *device_addr_{nullptr};
};
} // namespace device
class MS_EXPORT DataQueueHandler {
HANDLER_DEFINE(device::BlockQueueStatus_T, OpenDynamicBufQueue, const std::string &,
const std::function<void(void *, int32_t)>);
HANDLER_DEFINE(device::BlockQueueStatus_T, Open, const std::string &, const std::function<void(void *, int32_t)>);
HANDLER_DEFINE(bool, IsClosed);
HANDLER_DEFINE(void, CloseConfirm);
HANDLER_DEFINE(void, Close, const std::string &);
HANDLER_DEFINE(device::BlockQueueStatus_T, Clear, const std::string &);
HANDLER_DEFINE(device::BlockQueueStatus_T, Push, const std::string &, const std::vector<device::DataQueueItem> &,
unsigned int);
};
} // namespace mindspore
#endif // MINDSPORE_CCSRC_INCLUDE_COMMON_UTILS_DATA_QUEUE_HANDLER_H_

View File

@ -26,10 +26,15 @@
#include "utils/visible.h"
#include "ir/scope.h"
#include "utils/trace_info.h"
namespace mindspore {
enum SourceLineTip { kSourceLineTipDiscard = 0, kSourceLineTipNextLine = 1, kSourceLineTipInLine = 2 };
typedef enum CacheBool { UNCACHED = -1, FALSE, TRUE } CacheBool;
// typedef enum CacheBool { UNCACHED = -1, FALSE, TRUE } CacheBool;
using CacheBool = int32_t;
const CacheBool Uncached = -1;
const CacheBool False = 0;
const CacheBool True = 1;
// Location class record the location in source code.
class Location {
public:

View File

@ -979,7 +979,7 @@ class _CellGraphExecutor:
self._graph_executor.set_kernel_build_server_dir(os.path.split(kernel_build_server.__file__)[0] + os.sep)
def init_dataset(self, queue_name, dataset_size, batch_size, dataset_types, dataset_shapes,
input_indexs, phase='dataset'):
input_indexs, phase='dataset', need_run=True):
"""
Initialization interface for calling data subgraph.
@ -1001,7 +1001,8 @@ class _CellGraphExecutor:
types=dataset_types,
shapes=dataset_shapes,
input_indexs=input_indexs,
phase=phase):
phase=phase,
need_run=need_run):
raise RuntimeError("Failure to init and dataset subgraph!")
self._graph_executor.set_queue_name(queue_name)
return True

View File

@ -120,7 +120,8 @@ def set_seed(seed):
if not isinstance(seed, int) or isinstance(seed, bool):
raise TypeError("seed isn't of type int.")
if seed < 0 or seed > UINT32_MAX:
raise ValueError("seed given is not within the required range [0, UINT32_MAX(4294967295)].")
raise ValueError(
"seed given is not within the required range [0, UINT32_MAX(4294967295)].")
_config.set_seed(seed)
random.seed(seed)
# numpy.random isn't thread safe
@ -167,7 +168,8 @@ def set_prefetch_size(size):
if not isinstance(size, int) or isinstance(size, bool):
raise TypeError("size isn't of type int.")
if size <= 0 or size > INT32_MAX:
raise ValueError("size is not within the required range (0, INT32_MAX(2147483647)].")
raise ValueError(
"size is not within the required range (0, INT32_MAX(2147483647)].")
_config.set_op_connector_size(size)
@ -281,7 +283,8 @@ def set_monitor_sampling_interval(interval):
if not isinstance(interval, int) or isinstance(interval, bool):
raise TypeError("interval isn't of type int.")
if interval <= 0 or interval > INT32_MAX:
raise ValueError("Interval given is not within the required range (0, INT32_MAX(2147483647)].")
raise ValueError(
"Interval given is not within the required range (0, INT32_MAX(2147483647)].")
_config.set_monitor_sampling_interval(interval)
@ -508,16 +511,20 @@ def set_enable_autotune(enable, filepath_prefix=None):
save_autoconfig = bool(enable and filepath_prefix is not None)
if filepath_prefix and not isinstance(filepath_prefix, str):
raise TypeError("json_filepath must be a str value but was: {}.".format(filepath_prefix))
raise TypeError(
"json_filepath must be a str value but was: {}.".format(filepath_prefix))
if enable and filepath_prefix == "":
raise RuntimeError("The value of json_filepath cannot be the empty string.")
raise RuntimeError(
"The value of json_filepath cannot be the empty string.")
if not enable and filepath_prefix is not None:
logger.warning("The value of json_filepath is ignored when enable is False.")
logger.warning(
"The value of json_filepath is ignored when enable is False.")
if enable and filepath_prefix is None:
logger.warning("Dataset AutoTune is enabled but no json path is specified, check INFO log for tuned result.")
logger.warning(
"Dataset AutoTune is enabled but no json path is specified, check INFO log for tuned result.")
json_filepath = replace_none(filepath_prefix, "")
@ -587,7 +594,8 @@ def set_autotune_interval(interval):
if not isinstance(interval, int) or isinstance(interval, bool):
raise TypeError("interval must be of type int.")
if interval < 0 or interval > INT32_MAX:
raise ValueError("Interval given is not within the required range [0, INT32_MAX(2147483647)].")
raise ValueError(
"Interval given is not within the required range [0, INT32_MAX(2147483647)].")
_config.set_autotune_interval(interval)
@ -621,7 +629,8 @@ def get_enable_shared_mem():
"""
# For Windows and MacOS we forbid shared mem function temporarily
if platform.system().lower() in {"windows", "darwin"}:
logger.warning("For Windows and MacOS we forbid shared mem function temporarily.")
logger.warning(
"For Windows and MacOS we forbid shared mem function temporarily.")
return False
return _config.get_enable_shared_mem()
@ -765,7 +774,8 @@ def set_multiprocessing_timeout_interval(interval):
if not isinstance(interval, int) or isinstance(interval, bool):
raise TypeError("interval isn't of type int.")
if interval <= 0 or interval > INT32_MAX:
raise ValueError("Interval given is not within the required range (0, INT32_MAX(2147483647)).")
raise ValueError(
"Interval given is not within the required range (0, INT32_MAX(2147483647)).")
_config.set_multiprocessing_timeout_interval(interval)
@ -785,3 +795,33 @@ def get_multiprocessing_timeout_interval():
>>> multiprocessing_timeout_interval = ds.config.get_multiprocessing_timeout_interval()
"""
return _config.get_multiprocessing_timeout_interval()
def set_dynamic_shape(is_dynamic):
"""
Set the dynamic shape flag of the dataset.
Args:
is_dynamic (bool): Whether the dataset is dynamic shape. Default: False
Raises:
TypeError: If `is_dynamic` is not a boolean data type.
Examples:
>>> ds.config.set_dynamic_shape(True)
"""
if not isinstance(is_dynamic, bool):
raise TypeError("is_dynamic must be a boolean dtype.")
_config.set_dynamic_shape(is_dynamic)
def get_dynamic_shape():
"""
Get the dynamic shape flag of the dataset
Returns:
bool, whether the dataset is dynamic shape.
Examples:
>>> is_dynamic_shape = ds.config.get_dynamic_shape()
"""
return _config.get_dynamic_shape()

View File

@ -57,26 +57,24 @@ def _get_types_and_shapes(dataset):
return dataset_types, dataset_shapes
def _exec_datagraph(exec_dataset, dataset_size, phase='dataset', create_data_info_queue=False):
def _exec_datagraph(exec_dataset, dataset_size, phase='dataset', create_data_info_queue=False, is_dynamic_shape=False):
"""Initialize and execute the dataset graph."""
batch_size = exec_dataset.get_batch_size()
input_indexs = exec_dataset.input_indexs
# transform data format
dataset_types, dataset_shapes = _get_types_and_shapes(exec_dataset)
if exec_dataset.dynamic_setting[0]:
_, dataset_shapes = exec_dataset.dynamic_min_max_shapes()
send_epoch_end = bool(dataset_size == -1)
exec_dataset = exec_dataset.device_que(send_epoch_end=send_epoch_end, create_data_info_queue=create_data_info_queue)
need_run = not is_dynamic_shape
_cell_graph_executor.init_dataset(exec_dataset.queue_name,
dataset_size,
batch_size,
dataset_types,
dataset_shapes,
input_indexs,
phase=phase)
phase=phase,
need_run=need_run)
return exec_dataset

View File

@ -19,6 +19,7 @@ from mindspore._checkparam import Validator
from mindspore.common.dtype import pytype_to_dtype
from mindspore.common.api import _cell_graph_executor
from mindspore.dataset.engine import offload
import mindspore.dataset as ds
from .. import context, nn
from ._utils import _exec_datagraph, _get_types_and_shapes, _construct_tensor_list
from ..parallel._utils import _get_device_num, _get_global_rank, _need_to_full, _to_full_shapes, _get_pipeline_stages
@ -43,12 +44,14 @@ def _send_data_no_flag(dataset, epoch_num):
def _dynamic_sink_data(dataset, dataset_iter):
"""Special scenario for dataset with sink_size=1."""
_, dataset_shapes = dataset_iter.types_shapes()
if hasattr(dataset_iter, "sink_size") and \
dataset_iter.sink_size == 1 and \
dataset.get_dataset_size() != 1 and \
hasattr(dataset_iter, "sink_count") and \
dataset_iter.sink_count == 1 and \
context.get_context("device_target") == "Ascend":
context.get_context("device_target") == "Ascend" and \
not _has_dynamic_shape(dataset_shapes):
return True
return False
@ -79,12 +82,14 @@ class _DataWrapper(nn.Cell):
"""
def __init__(self, network, dataset_types, dataset_shapes, queue_name, min_shapes=None, max_shapes=None):
super(_DataWrapper, self).__init__(auto_prefix=False, flags=network.get_flags())
super(_DataWrapper, self).__init__(
auto_prefix=False, flags=network.get_flags())
# Also copy the flag in `network` construct
flags = getattr(network.__class__.construct, "_mindspore_flags", {})
self.info = (dataset_types, dataset_shapes)
self.add_flags(**flags)
self.get_next = P.GetNext(dataset_types, dataset_shapes, len(dataset_types), queue_name)
self.get_next = P.GetNext(
dataset_types, dataset_shapes, len(dataset_types), queue_name)
if min_shapes is not None and max_shapes is not None:
Validator.check_value_type("min_shapes", min_shapes, [list, tuple])
Validator.check_value_type("max_shapes", max_shapes, [list, tuple])
@ -100,7 +105,8 @@ class _DataWrapper(nn.Cell):
def _generate_dataset_sink_mode_net(network, dataset_shapes, dataset_types, queue_name,
min_shapes=None, max_shapes=None):
if not isinstance(network, _DataWrapper):
network = _DataWrapper(network, dataset_types, dataset_shapes, queue_name, min_shapes, max_shapes)
network = _DataWrapper(
network, dataset_types, dataset_shapes, queue_name, min_shapes, max_shapes)
return network
@ -175,7 +181,8 @@ def connect_network_with_dataset(network, dataset_helper):
aux = _get_dataset_aux(dataset)
if isinstance(dataset_iter, _DatasetIterNormal):
raise RuntimeError("The API 'connect_network_with_dataset' should be called in dataset sink mode.")
raise RuntimeError(
"The API 'connect_network_with_dataset' should be called in dataset sink mode.")
if _is_role_sched() or (_is_role_pserver() and not _enable_distributed_mindrt()):
return network
@ -184,7 +191,8 @@ def connect_network_with_dataset(network, dataset_helper):
aux.__network__ = network
if aux.__network__ is not network:
raise ValueError("The dataset has been connected to other network, please check the code.")
raise ValueError(
"The dataset has been connected to other network, please check the code.")
queue_name = dataset.__transfer_dataset__.queue_name
if _dynamic_sink_scenario(dataset, dataset_iter) and not context.get_context("enable_ge"):
@ -199,8 +207,10 @@ def connect_network_with_dataset(network, dataset_helper):
device_num = _get_device_num() // _get_pipeline_stages()
dataset_shapes = _to_full_shapes(dataset_shapes, device_num)
network = _generate_dataset_sink_mode_net(network, dataset_shapes, dataset_types, queue_name)
aux.__network_manage__ = aux.__network_manage__ if hasattr(aux, '__network_manage__') else dict()
network = _generate_dataset_sink_mode_net(
network, dataset_shapes, dataset_types, queue_name)
aux.__network_manage__ = aux.__network_manage__ if hasattr(
aux, '__network_manage__') else dict()
aux.__network_manage__[key] = network
return network
@ -208,8 +218,10 @@ def connect_network_with_dataset(network, dataset_helper):
network = aux.__sink_network__
else:
if not context.get_context("enable_ge") and context.get_context("device_target") in ("Ascend", "GPU"):
network = offload.check_add_offload_sink_mode(dataset, dataset_helper, network)
network = _generate_network_with_dataset(network, dataset_helper, queue_name)
network = offload.check_add_offload_sink_mode(
dataset, dataset_helper, network)
network = _generate_network_with_dataset(
network, dataset_helper, queue_name)
aux.__sink_network__ = network
if _dynamic_sink_data(dataset, dataset_iter) and _dynamic_sink_exception_scenario(dataset_iter):
@ -262,7 +274,8 @@ class DatasetHelper:
dataset_sink_mode = Validator.check_bool(dataset_sink_mode)
Validator.check_is_int(sink_size)
if sink_size < -1 or sink_size == 0:
raise ValueError("The 'sink_size' must be -1 or positive, but got sink_size {}.".format(sink_size))
raise ValueError(
"The 'sink_size' must be -1 or positive, but got sink_size {}.".format(sink_size))
if sink_size == -1:
sink_size = dataset.get_dataset_size()
@ -337,7 +350,7 @@ class DatasetHelper:
def _reset(self, step):
"""Reset the dataset to the provided step."""
self.iter._reset(step) # pylint: disable=W0212
self.iter._reset(step) # pylint: disable=W0212
def get_data_info(self):
"""
@ -370,23 +383,29 @@ class _DatasetIter:
self.dataset = dataset
self.sink_size = sink_size
self.sink_count = self.get_sink_count(dataset)
self.dataset_types, self.dataset_shapes = _get_types_and_shapes(
dataset)
self.dynamic_shape = _has_dynamic_shape(self.dataset_shapes) or ds.config.get_dynamic_shape()
if self.dynamic_shape:
ds.config.set_dynamic_shape(True)
if not hasattr(dataset, '__transfer_dataset__'):
if hasattr(dataset, '__loop_size__'):
# PS mode does not support loop sink and need get the real sink size.
if not (_is_role_worker() and _is_ps_mode()):
self.sink_size = dataset.__loop_size__
create_data_info_queue = (sink_size == 1 and self.sink_count == 1 and dataset.get_dataset_size() != 1
and context.get_context("device_target") == "Ascend")
and context.get_context("device_target") == "Ascend" and not self.dynamic_shape)
dataset.__transfer_dataset__ = _exec_datagraph(dataset, self.sink_size,
create_data_info_queue=create_data_info_queue)
create_data_info_queue=create_data_info_queue,
is_dynamic_shape=self.dynamic_shape)
if not hasattr(dataset, '__no_send__'):
_send_data(dataset, epoch_num)
else:
# if using an existed __transfer_dataset__, set the queue_name directly
if not dataset.__transfer_dataset__.queue_name:
_cell_graph_executor.set_queue_name(dataset.__transfer_dataset__.queue_name)
_cell_graph_executor.set_queue_name(
dataset.__transfer_dataset__.queue_name)
_send_data_no_flag(dataset, epoch_num)
self.stop_send = dataset.__transfer_dataset__.stop_send
@ -394,7 +413,6 @@ class _DatasetIter:
self.continue_send = dataset.__transfer_dataset__.continue_send
self.get_data_info = dataset.__transfer_dataset__.get_data_info
self.dynamic_min_max_shapes = dataset.dynamic_min_max_shapes
self.dataset_types, self.dataset_shapes = _get_types_and_shapes(dataset)
if hasattr(dataset.__transfer_dataset__, "_reset"):
self._reset = dataset.__transfer_dataset__._reset # pylint: disable=W0212
@ -452,7 +470,8 @@ class _DatasetIterGE(_DatasetIter):
batch_expand_num = 1
if _need_to_full():
batch_expand_num = _get_device_num() // _get_pipeline_stages()
tensor_list_run = _construct_tensor_list(self.dataset_types, self.dataset_shapes, batch_expand_num)
tensor_list_run = _construct_tensor_list(
self.dataset_types, self.dataset_shapes, batch_expand_num)
def op():
return tensor_list_run
@ -487,7 +506,8 @@ class _DatasetIterMSLoopSink(_DatasetIter):
# compile is device_number times the batch dimension of tensors for run. Now only support LoopSink.
if _need_to_full():
device_num = _get_device_num() // _get_pipeline_stages()
self.dataset_shapes = _to_full_shapes(self.dataset_shapes, device_num)
self.dataset_shapes = _to_full_shapes(
self.dataset_shapes, device_num)
def op():
return tuple()
@ -533,7 +553,8 @@ class _DatasetIterNormal:
self.dataset = dataset
self.device_num = _get_device_num()
self.global_rank = _get_global_rank()
self.iter = self.dataset.create_tuple_iterator(num_epochs=epoch_num, do_copy=True)
self.iter = self.dataset.create_tuple_iterator(
num_epochs=epoch_num, do_copy=True)
def __iter__(self):
return self

View File

@ -316,7 +316,8 @@ add_library(backend_static STATIC
$<TARGET_OBJECTS:_mindspore_runtime_device_obj>
$<TARGET_OBJECTS:_mindspore_runtime_graph_scheduler_obj>
$<TARGET_OBJECTS:_mindspore_runtime_hardware_obj>
$<TARGET_OBJECTS:_mindspore_runtime_pynative_obj>)
$<TARGET_OBJECTS:_mindspore_runtime_pynative_obj>
$<TARGET_OBJECTS:_mindspore_runtime_data_queue_obj>)
target_link_libraries(ut_tests PRIVATE mindspore securec -Wl,--start-group proto_input mindspore::protobuf
backend_static -Wl,--end-group)
target_link_libraries(ut_tests PRIVATE mindspore::grpc++)