From cf829d84156af2e45ca91285d30707ed8a3db02c Mon Sep 17 00:00:00 2001 From: wilfChen Date: Sat, 26 Mar 2022 10:16:06 +0800 Subject: [PATCH] fix gpu buffer reentry issue --- .../engine/datasetops/device_queue_op.cc | 24 ++- .../engine/datasetops/device_queue_op.h | 2 +- .../device/gpu/hal/device/blocking_queue.h | 2 +- .../device/gpu/hal/device/gpu_buffer_mgr.cc | 154 +++++++----------- .../device/gpu/hal/device/gpu_buffer_mgr.h | 33 ++-- .../gpu/kernel/data/dataset_init_kernel.cc | 2 +- .../kernel/data/dataset_iterator_kernel.cc | 24 +-- .../gpu/kernel/data/dataset_iterator_kernel.h | 2 +- 8 files changed, 99 insertions(+), 144 deletions(-) diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc index cc5a23b0ead..7c6e3c4ce10 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc @@ -442,10 +442,9 @@ Status DeviceQueueOp::PushDataToGPU() { auto items = std::move(item.data_item); bool eoe_flag = item.eoe_flag; int64_t send_batch = 0; - uint32_t handle = INVALID_HANDLE; auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2); - handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, {}, release_function); - if (handle == INVALID_HANDLE) { + auto ret = GpuBufferMgr::GetInstance().Open(channel_name_, {}, release_function); + if (ret != BlockQueueStatus_T::SUCCESS) { return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "[Internal ERROR] Failed to open channel for sending data."); } @@ -463,7 +462,7 @@ Status DeviceQueueOp::PushDataToGPU() { return Status(StatusCode::kMDTimeOut, __LINE__, __FILE__, "[Internal ERROR] Failed to prefetch data in current PS mode(cache data when sending)."); } - RETURN_IF_NOT_OK(RetryPushData(handle, items, is_profiling_enable, &push_cost)); + RETURN_IF_NOT_OK(RetryPushData(items, is_profiling_enable, &push_cost)); #ifndef ENABLE_SECURITY ProfilingRecorder(is_profiling_enable, profiling_node, send_batch, push_cost, &batch_start_time, &end_time, gpu_connector_->capacity(), gpu_connector_->size()); @@ -491,7 +490,7 @@ Status DeviceQueueOp::PushDataToGPU() { eoe_flag = item.eoe_flag; // If the batches send by dataset are more than gpu calculate, gpu will core for no signal notify. if (rc.IsError()) { - GpuBufferMgr::GetInstance().Close(handle); + GpuBufferMgr::GetInstance().Close(channel_name_); GpuBufferMgr::GetInstance().CloseConfirm(); return rc; } @@ -507,13 +506,12 @@ Status DeviceQueueOp::PushDataToGPU() { tree_->SetFinished(); MS_LOG(INFO) << "ExecutionTree finished. Device queue pushed number of batches: " << send_batch; - GpuBufferMgr::GetInstance().Close(handle); + GpuBufferMgr::GetInstance().Close(channel_name_); GpuBufferMgr::GetInstance().CloseConfirm(); return Status::OK(); } -Status DeviceQueueOp::RetryPushData(unsigned int handle, const std::vector &items, const bool profiling, - uint64_t *push_time) { +Status DeviceQueueOp::RetryPushData(const std::vector &items, const bool profiling, uint64_t *push_time) { bool flag_log = false; #ifndef ENABLE_SECURITY uint64_t start_time = 0; @@ -522,7 +520,7 @@ Status DeviceQueueOp::RetryPushData(unsigned int handle, const std::vectorInterrupted()) { - BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME); + BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(channel_name_, items, WAIT_TIME); if (ret) { if (ret == BlockQueueStatus_T::ERROR_INPUT) { return Status( @@ -673,16 +671,16 @@ Status DeviceQueueOp::MallocForGPUData(std::vector *items, Status DeviceQueueOp::ClearDevice() { MS_LOG(INFO) << "Clearing the data in GPU device: " << device_id_ << " channel: " << channel_name_; auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2); - auto handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, {}, release_function); - if (handle == INVALID_HANDLE) { + auto ret = GpuBufferMgr::GetInstance().Open(channel_name_, {}, release_function); + if (ret != BlockQueueStatus_T::SUCCESS) { return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "[Internal ERROR] Failed to open channel for clearing the device."); } - BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Clear(handle); + ret = GpuBufferMgr::GetInstance().Clear(channel_name_); CHECK_FAIL_RETURN_UNEXPECTED(!ret, "Failed to clear the device."); - GpuBufferMgr::GetInstance().Close(handle); + GpuBufferMgr::GetInstance().Close(channel_name_); GpuBufferMgr::GetInstance().CloseConfirm(); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h index e33b153b9a1..b81ef58ff18 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h @@ -145,7 +145,7 @@ class DeviceQueueOp : public PipelineOp { #ifdef ENABLE_GPUQUE Status SendDataToGPU(); Status MallocForGPUData(std::vector *items, const TensorRow &curr_row, const int32_t &worker_id); - Status RetryPushData(unsigned int handle, const std::vector &data, bool profiling, uint64_t *push_time); + Status RetryPushData(const std::vector &data, bool profiling, uint64_t *push_time); void ReleaseData(void *addr, int32_t worker_id); Status LaunchParallelCopyThread(); Status PushDataToGPU(); diff --git a/mindspore/ccsrc/plugin/device/gpu/hal/device/blocking_queue.h b/mindspore/ccsrc/plugin/device/gpu/hal/device/blocking_queue.h index e32ad3c2581..2b2763bd3fa 100644 --- a/mindspore/ccsrc/plugin/device/gpu/hal/device/blocking_queue.h +++ b/mindspore/ccsrc/plugin/device/gpu/hal/device/blocking_queue.h @@ -30,7 +30,7 @@ namespace mindspore { namespace device { -enum BlockQueueStatus_T : int { SUCCESS = 0, QUEUE_EXIST, HANDLE_NOT_EXIST, ERROR_INPUT, INTERNAL_ERROR, TIMEOUT }; +enum BlockQueueStatus_T : int { SUCCESS = 0, QUEUE_EXIST, QUEUE_NOT_EXIST, ERROR_INPUT, INTERNAL_ERROR, TIMEOUT }; struct DataItemGpu { int32_t worker_id_{0}; diff --git a/mindspore/ccsrc/plugin/device/gpu/hal/device/gpu_buffer_mgr.cc b/mindspore/ccsrc/plugin/device/gpu/hal/device/gpu_buffer_mgr.cc index 249335cd861..3a9cb254262 100644 --- a/mindspore/ccsrc/plugin/device/gpu/hal/device/gpu_buffer_mgr.cc +++ b/mindspore/ccsrc/plugin/device/gpu/hal/device/gpu_buffer_mgr.cc @@ -26,67 +26,51 @@ namespace py = pybind11; namespace mindspore { namespace device { -static unsigned int AllocHandle() { - static std::atomic handle(1); - return handle.fetch_add(1, std::memory_order_relaxed); -} - GpuBufferMgr &GpuBufferMgr::GetInstance() noexcept { static GpuBufferMgr instance; return instance; } -BlockQueueStatus_T GpuBufferMgr::Create(unsigned int device_id, const std::string &channel_name, void *addr, - const std::vector &shape, const size_t &capacity) { - std::string name = std::to_string(device_id) + std::string("_") + channel_name; - if (name_queue_map_.count(name)) { - MS_LOG(ERROR) << "Queue already exist: " << name; +BlockQueueStatus_T GpuBufferMgr::Create(const std::string &channel_name, void *addr, const std::vector &shape, + const size_t &capacity) { + MS_LOG(INFO) << "Gpu queue: " << channel_name << " created."; + if (name_queue_map_.count(channel_name)) { + MS_LOG(ERROR) << "Queue already exist: " << channel_name; return QUEUE_EXIST; } std::shared_ptr queue = std::make_shared(); BlockQueueStatus_T rt = queue->Create(addr, shape, capacity); if (rt != SUCCESS) { + MS_LOG(ERROR) << "Queue: " << channel_name << "create failed: " << rt; return rt; } - (void)name_queue_map_.insert(std::make_pair(name, queue)); + (void)name_queue_map_.insert(std::make_pair(channel_name, queue)); init_ = true; return SUCCESS; } -unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name, - const std::vector &shape, const std::function func) { +BlockQueueStatus_T GpuBufferMgr::Open(const std::string &channel_name, const std::vector &shape, + const std::function func) { + MS_LOG(INFO) << "Gpu queue: " << channel_name << " open."; set_device(); - std::string name = std::to_string(device_id) + std::string("_") + channel_name; - if (!name_queue_map_.count(name)) { - MS_LOG(ERROR) << "Queue not exist " << name; - return INVALID_HANDLE; + if (!name_queue_map_.count(channel_name)) { + MS_LOG(ERROR) << "Queue not exist " << channel_name; + return QUEUE_NOT_EXIST; } - unsigned int handle = AllocHandle(); - if (handle == INVALID_HANDLE) { - MS_LOG(ERROR) << "handle is invalid"; - return INVALID_HANDLE; - } - (void)handle_queue_map_.insert(std::make_pair(handle, name_queue_map_[name])); - name_queue_map_[name]->RegisterRelease(func); + + name_queue_map_[channel_name]->RegisterRelease(func); open_by_dataset_++; - return handle; + return SUCCESS; } -unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name, - const std::vector &shape) { +BlockQueueStatus_T GpuBufferMgr::Open(const std::string &channel_name, const std::vector &shape) { + MS_LOG(INFO) << "Gpu queue: " << channel_name << " open."; set_device(); - std::string name = std::to_string(device_id) + std::string("_") + channel_name; - if (!name_queue_map_.count(name)) { - MS_LOG(ERROR) << "Queue not exist " << name; - return INVALID_HANDLE; + if (!name_queue_map_.count(channel_name)) { + MS_LOG(ERROR) << "Queue not exist " << channel_name; + return QUEUE_NOT_EXIST; } - unsigned int handle = AllocHandle(); - if (handle == INVALID_HANDLE) { - MS_LOG(ERROR) << "handle is invalid"; - return INVALID_HANDLE; - } - (void)handle_queue_map_.insert(std::make_pair(handle, name_queue_map_[name])); - return handle; + return SUCCESS; } void GpuBufferMgr::set_device_id(int device_id) { cur_dev_id_ = device_id; } @@ -105,44 +89,48 @@ void GpuBufferMgr::set_device() const { } } -BlockQueueStatus_T GpuBufferMgr::Push(unsigned int handle, const std::vector &data, +BlockQueueStatus_T GpuBufferMgr::Push(const std::string &channel_name, const std::vector &data, unsigned int timeout_in_sec) { - auto iter = handle_queue_map_.find(handle); - if (iter == handle_queue_map_.end()) { - return HANDLE_NOT_EXIST; + auto iter = name_queue_map_.find(channel_name); + if (iter == name_queue_map_.end()) { + MS_LOG(ERROR) << "Queue not exist " << channel_name; + return QUEUE_NOT_EXIST; } return iter->second->Push(data, timeout_in_sec); } -BlockQueueStatus_T GpuBufferMgr::Front(unsigned int handle, std::vector *data) { - auto iter = handle_queue_map_.find(handle); - if (iter == handle_queue_map_.end()) { - return HANDLE_NOT_EXIST; +BlockQueueStatus_T GpuBufferMgr::Front(const std::string &channel_name, std::vector *data) { + auto iter = name_queue_map_.find(channel_name); + if (iter == name_queue_map_.end()) { + MS_LOG(ERROR) << "Queue not exist " << channel_name; + return QUEUE_NOT_EXIST; } + return iter->second->Front(data); } -BlockQueueStatus_T GpuBufferMgr::Pop(unsigned int handle) { - auto iter = handle_queue_map_.find(handle); - if (iter == handle_queue_map_.end()) { - return HANDLE_NOT_EXIST; +BlockQueueStatus_T GpuBufferMgr::Pop(const std::string &channel_name) { + auto iter = name_queue_map_.find(channel_name); + if (iter == name_queue_map_.end()) { + MS_LOG(ERROR) << "Queue not exist " << channel_name; + return QUEUE_NOT_EXIST; } + return iter->second->Pop(); } -BlockQueueStatus_T GpuBufferMgr::Clear(unsigned int handle) { - auto iter = handle_queue_map_.find(handle); - if (iter == handle_queue_map_.end()) { - return HANDLE_NOT_EXIST; +BlockQueueStatus_T GpuBufferMgr::Clear(const std::string &channel_name) { + auto iter = name_queue_map_.find(channel_name); + if (iter == name_queue_map_.end()) { + MS_LOG(ERROR) << "Queue not exist " << channel_name; + return QUEUE_NOT_EXIST; } + return iter->second->Clear(); } -void GpuBufferMgr::Close(unsigned int handle) noexcept { - if (!handle_queue_map_.count(handle)) { - return; - } - (void)handle_queue_map_.erase(handle); +void GpuBufferMgr::Close(const std::string &channel_name) noexcept { + MS_LOG(INFO) << "Close the queue: " << channel_name; return; } @@ -151,6 +139,7 @@ bool GpuBufferMgr::IsInit() const { return init_; } bool GpuBufferMgr::IsClosed() const { return closed_; } bool GpuBufferMgr::Destroy() { + MS_LOG(INFO) << "Destroy all GPU queue."; for (auto iter = name_queue_map_.begin(); iter != name_queue_map_.end(); ++iter) { std::shared_ptr queue = iter->second; if (queue != nullptr) { @@ -164,9 +153,8 @@ bool GpuBufferMgr::Destroy() { return true; } -inline bool GpuBufferMgr::isCreated(unsigned int device_id, const std::string &channel_name) { - std::string name = std::to_string(device_id) + std::string("_") + channel_name; - if (name_queue_map_.count(name) != 0) { +inline bool GpuBufferMgr::isCreated(const std::string &channel_name) { + if (name_queue_map_.count(channel_name) != 0) { return true; } return false; @@ -195,46 +183,20 @@ bool GpuBufferMgr::CloseNotify() { void GpuBufferMgr::CloseConfirm() { sema.Signal(); } -size_t GpuBufferMgr::Size(unsigned int handle) { - if (handle == INVALID_HANDLE) { - MS_LOG(ERROR) << "handle is invalid"; +size_t GpuBufferMgr::Size(const std::string &channel_name) { + if (!name_queue_map_.count(channel_name)) { + MS_LOG(ERROR) << "Queue not exist " << channel_name; return 0; } - if (handle_queue_map_.count(handle) == 0) { - MS_LOG(ERROR) << "Handle not exist " << handle; - return 0; - } - return handle_queue_map_.at(handle)->Size(); + return name_queue_map_.at(channel_name)->Size(); } -size_t GpuBufferMgr::Size(unsigned int device_id, const std::string &channel_name) { - std::string name = std::to_string(device_id) + std::string("_") + channel_name; - if (!name_queue_map_.count(name)) { - MS_LOG(ERROR) << "Queue not exist " << name; +size_t GpuBufferMgr::Capacity(const std::string &channel_name) { + if (!name_queue_map_.count(channel_name)) { + MS_LOG(ERROR) << "Queue not exist " << channel_name; return 0; } - return name_queue_map_.at(name)->Size(); -} - -size_t GpuBufferMgr::Capacity(unsigned int handle) { - if (handle == INVALID_HANDLE) { - MS_LOG(ERROR) << "handle is invalid"; - return 0; - } - if (handle_queue_map_.count(handle) == 0) { - MS_LOG(ERROR) << "Handle not exist " << handle; - return 0; - } - return handle_queue_map_.at(handle)->Capacity(); -} - -size_t GpuBufferMgr::Capacity(unsigned int device_id, const std::string &channel_name) { - std::string name = std::to_string(device_id) + std::string("_") + channel_name; - if (!name_queue_map_.count(name)) { - MS_LOG(ERROR) << "Queue not exist " << name; - return 0; - } - return name_queue_map_.at(name)->Capacity(); + return name_queue_map_.at(channel_name)->Capacity(); } } // namespace device } // namespace mindspore diff --git a/mindspore/ccsrc/plugin/device/gpu/hal/device/gpu_buffer_mgr.h b/mindspore/ccsrc/plugin/device/gpu/hal/device/gpu_buffer_mgr.h index 02558cad13b..98f9d2e2f5b 100644 --- a/mindspore/ccsrc/plugin/device/gpu/hal/device/gpu_buffer_mgr.h +++ b/mindspore/ccsrc/plugin/device/gpu/hal/device/gpu_buffer_mgr.h @@ -62,33 +62,31 @@ class Semaphore { class GpuBufferMgr { public: - static const unsigned int INVALID_HANDLE = 0xffffffffUL; - EXPORT GpuBufferMgr() : cur_dev_id_(0), init_(false), closed_(false), open_by_dataset_(0) {} EXPORT virtual ~GpuBufferMgr() = default; EXPORT static GpuBufferMgr &GetInstance() noexcept; - EXPORT BlockQueueStatus_T Create(unsigned int device_id, const std::string &channel_name, void *addr, - const std::vector &shape, const size_t &capacity); + EXPORT BlockQueueStatus_T Create(const std::string &channel_name, void *addr, const std::vector &shape, + const size_t &capacity); // call for Push thread - EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const std::vector &shape, - std::function func); + EXPORT BlockQueueStatus_T Open(const std::string &channel_name, const std::vector &shape, + std::function func); // call for Front/Pop thread - EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const std::vector &shape); + EXPORT BlockQueueStatus_T Open(const std::string &channel_name, const std::vector &shape); - EXPORT BlockQueueStatus_T Push(unsigned int handle, const std::vector &data, + EXPORT BlockQueueStatus_T Push(const std::string &channel_name, const std::vector &data, unsigned int timeout_in_sec); - EXPORT BlockQueueStatus_T Front(unsigned int handle, std::vector *data); - EXPORT BlockQueueStatus_T Pop(unsigned int handle); - EXPORT BlockQueueStatus_T Clear(unsigned int handle); + EXPORT BlockQueueStatus_T Front(const std::string &channel_name, std::vector *data); + EXPORT BlockQueueStatus_T Pop(const std::string &channel_name); + EXPORT BlockQueueStatus_T Clear(const std::string &channel_name); EXPORT void set_device_id(int device_id); - EXPORT void Close(unsigned int handle) noexcept; + EXPORT void Close(const std::string &channel_name) noexcept; EXPORT bool IsInit() const; @@ -102,13 +100,9 @@ class GpuBufferMgr { // call for dataset send thread EXPORT void CloseConfirm(); - EXPORT size_t Size(unsigned int handle); + EXPORT size_t Size(const std::string &channel_name); - EXPORT size_t Size(unsigned int device_id, const std::string &channel_name); - - EXPORT size_t Capacity(unsigned int handle); - - EXPORT size_t Capacity(unsigned int device_id, const std::string &channel_name); + EXPORT size_t Capacity(const std::string &channel_name); private: void set_device() const; @@ -122,10 +116,9 @@ class GpuBufferMgr { int open_by_dataset_; Semaphore sema; - std::map> handle_queue_map_; std::map> name_queue_map_; - inline bool isCreated(unsigned int device_id, const std::string &channel_name); + inline bool isCreated(const std::string &channel_name); GpuBufferMgr(const GpuBufferMgr &) = delete; GpuBufferMgr &operator=(const GpuBufferMgr &) = delete; diff --git a/mindspore/ccsrc/plugin/device/gpu/kernel/data/dataset_init_kernel.cc b/mindspore/ccsrc/plugin/device/gpu/kernel/data/dataset_init_kernel.cc index c0ad10f608f..4a4f76bdc91 100644 --- a/mindspore/ccsrc/plugin/device/gpu/kernel/data/dataset_init_kernel.cc +++ b/mindspore/ccsrc/plugin/device/gpu/kernel/data/dataset_init_kernel.cc @@ -59,7 +59,7 @@ bool DatasetInitKernelMod::Launch(const std::vector &, const std::ve << len << "]."; } - auto status = GpuBufferMgr::GetInstance().Create(0, queue_name_, addr, shapes_, buffer_q_capacity_); + auto status = GpuBufferMgr::GetInstance().Create(queue_name_, addr, shapes_, buffer_q_capacity_); if (status) { MS_LOG(EXCEPTION) << "For '" << kernel_name_ << "', init Dataset Failed. len: " << len << ", status:" << status; } diff --git a/mindspore/ccsrc/plugin/device/gpu/kernel/data/dataset_iterator_kernel.cc b/mindspore/ccsrc/plugin/device/gpu/kernel/data/dataset_iterator_kernel.cc index cd5e4b92a21..deb98238b87 100644 --- a/mindspore/ccsrc/plugin/device/gpu/kernel/data/dataset_iterator_kernel.cc +++ b/mindspore/ccsrc/plugin/device/gpu/kernel/data/dataset_iterator_kernel.cc @@ -37,9 +37,9 @@ namespace kernel { using mindspore::device::GpuBufferMgr; DatasetIteratorKernelMod::DatasetIteratorKernelMod() - : handle_(GpuBufferMgr::INVALID_HANDLE), profiling_enable_(false), profiling_op_(nullptr) {} + : is_opened_(false), profiling_enable_(false), profiling_op_(nullptr) {} -DatasetIteratorKernelMod::~DatasetIteratorKernelMod() { GpuBufferMgr::GetInstance().Close(handle_); } +DatasetIteratorKernelMod::~DatasetIteratorKernelMod() { GpuBufferMgr::GetInstance().Close(queue_name_); } bool DatasetIteratorKernelMod::Init(const CNodePtr &kernel_node) { MS_EXCEPTION_IF_NULL(kernel_node); @@ -92,10 +92,10 @@ bool DatasetIteratorKernelMod::ReadDevice(std::vector *data) { profiling_enable_ = profiler_inst->GetEnableFlag(); if (profiling_enable_) { start_time_stamp = profiling_op_->GetTimeStamp(); - queue_size = GpuBufferMgr::GetInstance().Size(handle_); + queue_size = GpuBufferMgr::GetInstance().Size(queue_name_); } #endif - auto ret = GpuBufferMgr::GetInstance().Front(handle_, data); + auto ret = GpuBufferMgr::GetInstance().Front(queue_name_, data); if (ret == device::SUCCESS) { #ifndef ENABLE_SECURITY if (profiling_enable_) { @@ -115,7 +115,7 @@ bool DatasetIteratorKernelMod::ReadDevice(std::vector *data) { #ifdef ENABLE_DUMP_IR mindspore::RDR::TriggerAll(); #endif - MS_LOG(EXCEPTION) << "For '" << kernel_name_ << "', get data timeout"; + MS_LOG(EXCEPTION) << "For '" << kernel_name_ << "', get data timeout. Queue name: " << queue_name_; } } #ifndef ENABLE_SECURITY @@ -124,7 +124,8 @@ bool DatasetIteratorKernelMod::ReadDevice(std::vector *data) { profiling_op_->RecordData(queue_size, start_time_stamp, end_time_stamp); } #endif - MS_LOG(ERROR) << "For '" << kernel_name_ << "', get data failed, errcode " << ret; + MS_LOG(ERROR) << "For '" << kernel_name_ << "', get data failed, errcode " << ret + << ", queue name: " << queue_name_; return false; } return true; @@ -132,11 +133,12 @@ bool DatasetIteratorKernelMod::ReadDevice(std::vector *data) { bool DatasetIteratorKernelMod::Launch(const std::vector &, const std::vector &, const std::vector &outputs, void *stream) { - if (handle_ == GpuBufferMgr::INVALID_HANDLE) { - handle_ = GpuBufferMgr::GetInstance().Open(0, queue_name_, output_size_list_); - if (handle_ == GpuBufferMgr::INVALID_HANDLE) { - MS_LOG(EXCEPTION) << "For '" << kernel_name_ << "', gpu Queue(" << queue_name_ << ") Open Failed"; + if (!is_opened_) { + auto ret = GpuBufferMgr::GetInstance().Open(queue_name_, output_size_list_); + if (ret != device::BlockQueueStatus_T::SUCCESS) { + MS_LOG(EXCEPTION) << "For '" << kernel_name_ << "', gpu Queue(" << queue_name_ << ") Open Failed: " << ret; } + is_opened_ = true; } if (!ReadDevice(&output_data_)) { @@ -155,7 +157,7 @@ bool DatasetIteratorKernelMod::Launch(const std::vector &, const std CHECK_CUDA_RET_WITH_EXCEPT(kernel_node_, cudaStreamSynchronize(reinterpret_cast(stream)), "cudaStreamSynchronize failed"); - (void)GpuBufferMgr::GetInstance().Pop(handle_); + (void)GpuBufferMgr::GetInstance().Pop(queue_name_); return true; } diff --git a/mindspore/ccsrc/plugin/device/gpu/kernel/data/dataset_iterator_kernel.h b/mindspore/ccsrc/plugin/device/gpu/kernel/data/dataset_iterator_kernel.h index b25952a4e24..69b35e8d6ff 100644 --- a/mindspore/ccsrc/plugin/device/gpu/kernel/data/dataset_iterator_kernel.h +++ b/mindspore/ccsrc/plugin/device/gpu/kernel/data/dataset_iterator_kernel.h @@ -44,7 +44,7 @@ class DatasetIteratorKernelMod : public NativeGpuKernelMod { private: bool ReadDevice(std::vector *data); std::string queue_name_; - unsigned int handle_; + bool is_opened_; bool profiling_enable_; std::shared_ptr profiling_op_; std::vector types_;