diff --git a/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.cc index 2c91d362592..064cd0cf6d4 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.cc @@ -141,19 +141,19 @@ Status DeviceQueueOp::SendDataToGPU() { for (int row_id = 0; row_id < current_buffer->NumRows() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed(); row_id++) { RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &curr_row)); - if (curr_row.size() < 2) { - return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Invalid tensor size"); + + std::vector data_size; + for (int i = 0; i < curr_row.size(); i++) { + data_size.push_back(static_cast(curr_row[i]->SizeInBytes())); } - uint32_t feature_size = static_cast(curr_row[0]->SizeInBytes()); - uint32_t label_size = static_cast(curr_row[1]->SizeInBytes()); if (!is_open) { - handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, feature_size, label_size, ReleaseData); + handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, ReleaseData); if (handle == INVALID_HANDLE) { return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "open failed"); } is_open = true; } - RETURN_IF_NOT_OK(RetryPushGPUData(feature_size, label_size, curr_row, handle)); + RETURN_IF_NOT_OK(RetryPushGPUData(data_size, curr_row, handle)); total_batch++; if (num_batch_ > 0 && total_batch == num_batch_) { is_break_loop = true; @@ -173,16 +173,23 @@ Status DeviceQueueOp::SendDataToGPU() { return Status::OK(); } -Status DeviceQueueOp::RetryPushGPUData(uint32_t feature_size, uint32_t label_size, const TensorRow &curr_row, +Status DeviceQueueOp::RetryPushGPUData(const std::vector &data_size, const TensorRow &curr_row, uint32_t handle) { - unsigned char *feature_addr = nullptr; - unsigned char *label_addr = nullptr; - while (true && !GpuBufferMgr::GetInstance().IsClosed()) { - RETURN_IF_NOT_OK(MallocForGPUData(&feature_addr, feature_size, &label_addr, label_size, curr_row)); - auto ret = GpuBufferMgr::GetInstance().Push(handle, feature_addr, feature_size, label_addr, label_size, WAIT_TIME); + std::vector items; + for (int i = 0; i < data_size.size(); i++) { + device::DataItemGpu data_item; + data_item.data_len_ = data_size[i]; + data_item.data_ptr_ = nullptr; + items.push_back(data_item); + } + + while (!GpuBufferMgr::GetInstance().IsClosed()) { + RETURN_IF_NOT_OK(MallocForGPUData(&items, curr_row)); + auto ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME); if (ret) { - free(feature_addr); - free(label_addr); + for (int i = 0; i < items.size(); i++) { + free(items[i].data_ptr_); + } MS_LOG(WARNING) << "Retry pushing data..."; continue; } else { @@ -192,29 +199,20 @@ Status DeviceQueueOp::RetryPushGPUData(uint32_t feature_size, uint32_t label_siz return Status::OK(); } -Status DeviceQueueOp::MallocForGPUData(unsigned char **feature_addr, uint32_t feature_size, unsigned char **label_addr, - uint32_t label_size, const TensorRow &curr_row) { - *feature_addr = (unsigned char *)malloc(feature_size); - if (*feature_addr == nullptr) { - return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "feature memory malloc failed."); - } - (void)memset_s(*feature_addr, feature_size, 0, feature_size); - unsigned char *feature = curr_row[0]->StartAddr(); - if (memcpy_s(*feature_addr, feature_size, feature, static_cast(curr_row[0]->SizeInBytes())) != 0) { - MS_LOG(ERROR) << "Feature memcpy_s failed!"; - return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "feature memcpy_s failed."); - } - - *label_addr = (unsigned char *)malloc(label_size); - if (*label_addr == nullptr) { - free(*feature_addr); - return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "label memory malloc failed."); - } - (void)memset_s(*label_addr, label_size, 0, label_size); - unsigned char *label = curr_row[1]->StartAddr(); - if (memcpy_s(*label_addr, label_size, label, static_cast(curr_row[1]->SizeInBytes())) != 0) { - MS_LOG(ERROR) << "Label memcpy_s failed!"; - return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "label memcpy_s failed."); +Status DeviceQueueOp::MallocForGPUData(std::vector *items, const TensorRow &curr_row) { + int i = 0; + for (auto &sub_item : *items) { + sub_item.data_ptr_ = (unsigned char *)malloc(sub_item.data_len_); + if (sub_item.data_ptr_ == nullptr) { + return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "memory malloc failed."); + } + (void)memset_s(sub_item.data_ptr_, sub_item.data_len_, 0, sub_item.data_len_); + unsigned char *column_data = curr_row[i]->StartAddr(); + if (memcpy_s(sub_item.data_ptr_, sub_item.data_len_, column_data, + static_cast(curr_row[i++]->SizeInBytes())) != 0) { + MS_LOG(ERROR) << "memcpy_s failed!"; + return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "memcpy_s failed."); + } } return Status::OK(); diff --git a/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.h b/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.h index 11ff1bde95a..8856cc4460a 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.h @@ -145,9 +145,8 @@ class DeviceQueueOp : public PipelineOp { #ifdef ENABLE_GPUQUE Status SendDataToGPU(); - Status RetryPushGPUData(uint32_t feature_size, uint32_t label_size, const TensorRow &curr_row, uint32_t handle); - Status MallocForGPUData(unsigned char **feature_addr, uint32_t feature_size, unsigned char **label_addr, - uint32_t label_size, const TensorRow &curr_row); + Status RetryPushGPUData(const std::vector &data_size, const TensorRow &curr_row, uint32_t handle); + Status MallocForGPUData(std::vector *items, const TensorRow &curr_row); #endif Status SendDataToCPU(); diff --git a/mindspore/ccsrc/device/gpu/blocking_queue.cc b/mindspore/ccsrc/device/gpu/blocking_queue.cc index c36b1cdbf54..3b5e75f551d 100644 --- a/mindspore/ccsrc/device/gpu/blocking_queue.cc +++ b/mindspore/ccsrc/device/gpu/blocking_queue.cc @@ -21,64 +21,49 @@ namespace mindspore { namespace device { -GpuQueue::GpuQueue(void *addr, size_t feature_size, size_t label_size, size_t capacity) - : buffer_(addr), - head_(0), - tail_(0), - feature_size_(feature_size), - label_size_(label_size), - capacity_(capacity), - stream_(0), - node_info_(nullptr) { +GpuQueue::GpuQueue(void *addr, const std::vector &shape, const size_t &capacity) + : buffer_(addr), head_(0), tail_(0), shape_(shape), len_(0), capacity_(capacity), stream_(0), node_info_(nullptr) { CHECK_CUDA_RET_WITH_ERROR(cudaStreamCreate(&stream_), "Cuda Create Stream Failed"); node_info_ = std::make_unique(capacity); + for (auto item : shape) { + len_ += item; + } } GpuQueue::~GpuQueue() { buffer_ = nullptr; } -BlockQueueStatus_T GpuQueue::Push(void *feature_addr, size_t feature_size, void *label_addr, size_t label_size) { - if ((feature_addr == nullptr) || (label_addr == nullptr)) { - MS_LOG(ERROR) << "input nullptr"; - return ERROR_INPUT; +BlockQueueStatus_T GpuQueue::Push(const std::vector &data) { + int offset = 0; + for (size_t i = 0; i < data.size(); i++) { + auto item = data[i]; + if (item.data_ptr_ == nullptr || item.data_len_ != shape_[i]) { + MS_LOG(ERROR) << "Invalid Input: ptr: " << item.data_ptr_ << ", len: " << item.data_len_; + return ERROR_INPUT; + } + + void *addr = reinterpret_cast(buffer_) + tail_ * len_ + offset; + CHECK_CUDA_RET_WITH_ERROR(cudaMemcpyAsync(addr, item.data_ptr_, item.data_len_, cudaMemcpyHostToDevice, stream_), + "Cuda Memcpy Error"); + + offset += item.data_len_; } - if ((feature_size != feature_size_) || (label_size != label_size_)) { - MS_LOG(ERROR) << "Data input error. Input data size: (" << feature_size << ", " << label_size << "), with (" - << feature_size_ << ", " << label_size_ << ") expect"; - return ERROR_INPUT; - } - void *feature_start_addr = reinterpret_cast(buffer_) + tail_ * (feature_size + label_size); - if (feature_start_addr == nullptr) { - MS_LOG(ERROR) << "feature start addr is nullptr"; - return INTERNAL_ERROR; - } - CHECK_CUDA_RET_WITH_ERROR( - cudaMemcpyAsync(feature_start_addr, feature_addr, feature_size, cudaMemcpyHostToDevice, stream_), - "Cuda Memcpy Error"); - void *label_start_addr = reinterpret_cast(feature_start_addr) + feature_size; - if (label_start_addr == nullptr) { - MS_LOG(ERROR) << "label start addr is nullptr"; - return INTERNAL_ERROR; - } - CHECK_CUDA_RET_WITH_ERROR(cudaMemcpyAsync(label_start_addr, label_addr, label_size, cudaMemcpyHostToDevice, stream_), - "Cuda Memcpy Error"); + node_info_[tail_].event_.reset(new cudaEvent_t()); CHECK_CUDA_RET_WITH_ERROR(cudaEventCreate(&(*(node_info_[tail_].event_))), "Cuda Create Event Failed"); - node_info_[tail_].host_feature_addr_ = feature_addr; - node_info_[tail_].host_label_addr_ = label_addr; + node_info_[tail_].data_ = data; tail_ = (tail_ + 1) % (capacity_); return SUCCESS; } -BlockQueueStatus_T GpuQueue::Front(void **feature_addr, size_t *feature_size, void **label_addr, - size_t *label_size) const { +BlockQueueStatus_T GpuQueue::Front(void **addr, size_t *len) const { CHECK_CUDA_RET_WITH_ERROR(cudaEventSynchronize(*(node_info_[head_].event_)), "Cuda Event Syn Failed"); CHECK_CUDA_RET_WITH_ERROR(cudaEventDestroy(*(node_info_[head_].event_)), "Cuda Destroy Event Failed"); - *feature_addr = (unsigned char *)buffer_ + head_ * (feature_size_ + label_size_); - *feature_size = feature_size_; - *label_addr = (unsigned char *)buffer_ + head_ * (feature_size_ + label_size_) + feature_size_; - *label_size = label_size_; - host_release_(node_info_[head_].host_feature_addr_); - host_release_(node_info_[head_].host_label_addr_); + *addr = (unsigned char *)buffer_ + head_ * len_; + *len = len_; + + for (auto item : node_info_[head_].data_) { + host_release_(item.data_ptr_); + } return SUCCESS; } @@ -100,26 +85,25 @@ bool GpuQueue::Destroy() { } } -BlockQueueStatus_T BlockingQueue::Create(void *addr, size_t feature_size, size_t label_size, size_t capacity) { +BlockQueueStatus_T BlockingQueue::Create(void *addr, const std::vector &shape, const size_t &capacity) { if (addr == nullptr) { MS_LOG(ERROR) << "addr is nullptr"; return INTERNAL_ERROR; } - queue_ = std::make_shared(addr, feature_size, label_size, capacity); + queue_ = std::make_shared(addr, shape, capacity); return SUCCESS; } void BlockingQueue::RegisterRelease(const std::function &func) { queue_->RegisterRelease(func); } -BlockQueueStatus_T BlockingQueue::Push(void *feature_addr, size_t feature_size, void *label_addr, size_t label_size, - unsigned int timeout_in_sec) { +BlockQueueStatus_T BlockingQueue::Push(const std::vector &data, unsigned int timeout_in_sec) { std::unique_lock locker(mutex_); if (queue_->IsFull()) { if (not_full_cond_.wait_for(locker, std::chrono::seconds(timeout_in_sec)) == std::cv_status::timeout) { return TIMEOUT; } } - auto ret = queue_->Push(feature_addr, feature_size, label_addr, label_size); + auto ret = queue_->Push(data); if (ret) { return ret; } @@ -127,15 +111,14 @@ BlockQueueStatus_T BlockingQueue::Push(void *feature_addr, size_t feature_size, return SUCCESS; } -BlockQueueStatus_T BlockingQueue::Front(void **feature_addr, size_t *feature_size, void **label_addr, - size_t *label_size) { +BlockQueueStatus_T BlockingQueue::Front(void **addr, size_t *len) { std::unique_lock locker(mutex_); bool timeout = not_empty_cond_.wait_for(locker, std::chrono::seconds(30), [this] { return !queue_->IsEmpty(); }); if (!timeout) { return TIMEOUT; } - return queue_->Front(feature_addr, feature_size, label_addr, label_size); + return queue_->Front(addr, len); } BlockQueueStatus_T BlockingQueue::Pop() { diff --git a/mindspore/ccsrc/device/gpu/blocking_queue.h b/mindspore/ccsrc/device/gpu/blocking_queue.h index a1594c21a97..77744bce316 100644 --- a/mindspore/ccsrc/device/gpu/blocking_queue.h +++ b/mindspore/ccsrc/device/gpu/blocking_queue.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -31,9 +32,14 @@ namespace mindspore { namespace device { enum BlockQueueStatus_T : int { SUCCESS = 0, QUEUE_NOT_EXIST, HANDLE_NOT_EXIST, ERROR_INPUT, INTERNAL_ERROR, TIMEOUT }; +struct DataItemGpu { + size_t data_len_; + void *data_ptr_; +}; + class GpuQueue { public: - GpuQueue(void *addr, size_t feature_size, size_t label_size, size_t capacity); + GpuQueue(void *addr, const std::vector &shape, const size_t &capacity); virtual ~GpuQueue(); void RegisterRelease(const std::function &func) { host_release_ = func; } @@ -41,23 +47,22 @@ class GpuQueue { inline bool IsEmpty() const { return head_ == tail_; } inline bool IsFull() const { return head_ == ((tail_ + 1) % (capacity_)); } - BlockQueueStatus_T Push(void *feature_addr, size_t feature_size, void *label_addr, size_t label_size); - BlockQueueStatus_T Front(void **feature_addr, size_t *feature_size, void **label_addr, size_t *label_size) const; + BlockQueueStatus_T Push(const std::vector &data); + BlockQueueStatus_T Front(void **ptr, size_t *len) const; BlockQueueStatus_T Pop(); bool Destroy(); private: struct NodeInfo { std::unique_ptr event_; - void *host_feature_addr_; - void *host_label_addr_; + std::vector data_; }; void *buffer_; size_t head_; size_t tail_; - size_t feature_size_; - size_t label_size_; + std::vector shape_; + size_t len_; size_t capacity_; cudaStream_t stream_; std::unique_ptr node_info_; @@ -72,11 +77,10 @@ class BlockingQueue { BlockingQueue() : queue_(nullptr) {} ~BlockingQueue() = default; - BlockQueueStatus_T Create(void *addr, size_t feature_size, size_t label_size, size_t capacity); + BlockQueueStatus_T Create(void *addr, const std::vector &shape, const size_t &capacity); void RegisterRelease(const std::function &func); - BlockQueueStatus_T Push(void *feature_addr, size_t feature_size, void *label_addr, size_t label_size, - unsigned int timeout_in_sec); - BlockQueueStatus_T Front(void **feature_addr, size_t *feature_size, void **label_addr, size_t *label_size); + BlockQueueStatus_T Push(const std::vector &data, unsigned int timeout_in_sec); + BlockQueueStatus_T Front(void **ptr, size_t *len); BlockQueueStatus_T Pop(); bool Destroy(); diff --git a/mindspore/ccsrc/device/gpu/gpu_buffer_mgr.cc b/mindspore/ccsrc/device/gpu/gpu_buffer_mgr.cc index bfffb9fc059..9c2e1366f30 100644 --- a/mindspore/ccsrc/device/gpu/gpu_buffer_mgr.cc +++ b/mindspore/ccsrc/device/gpu/gpu_buffer_mgr.cc @@ -45,14 +45,14 @@ GpuBufferMgr &GpuBufferMgr::GetInstance() noexcept { } BlockQueueStatus_T GpuBufferMgr::Create(unsigned int device_id, const std::string &channel_name, void *addr, - const size_t &feature_len, const size_t &label_size, const size_t &capacity) { + const std::vector &shape, const size_t &capacity) { std::string name = std::to_string(device_id) + std::string("_") + channel_name; if (name_queue_map_.count(name)) { MS_LOG(ERROR) << "Queue not exist " << name; return QUEUE_NOT_EXIST; } std::shared_ptr queue = std::make_shared(); - BlockQueueStatus_T rt = queue->Create(addr, feature_len, label_size, capacity); + BlockQueueStatus_T rt = queue->Create(addr, shape, capacity); if (rt != SUCCESS) { return rt; } @@ -61,8 +61,8 @@ BlockQueueStatus_T GpuBufferMgr::Create(unsigned int device_id, const std::strin return SUCCESS; } -unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name, const size_t &, const size_t &, - const std::function func) { +unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name, + const std::vector &shape, const std::function func) { set_device(); std::string name = std::to_string(device_id) + std::string("_") + channel_name; if (!name_queue_map_.count(name)) { @@ -80,8 +80,8 @@ unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &chann return handle; } -unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name, const size_t &, - const size_t &) { +unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name, + const std::vector &shape) { set_device(); std::string name = std::to_string(device_id) + std::string("_") + channel_name; if (!name_queue_map_.count(name)) { @@ -106,22 +106,21 @@ void GpuBufferMgr::set_device() const { } } -BlockQueueStatus_T GpuBufferMgr::Push(unsigned int handle, void *feature_addr, size_t feature_size, void *label_addr, - size_t label_size, unsigned int timeout_in_sec) { +BlockQueueStatus_T GpuBufferMgr::Push(unsigned int handle, const std::vector &data, + unsigned int timeout_in_sec) { auto iter = handle_queue_map_.find(handle); if (iter == handle_queue_map_.end()) { return HANDLE_NOT_EXIST; } - return iter->second->Push(feature_addr, feature_size, label_addr, label_size, timeout_in_sec); + return iter->second->Push(data, timeout_in_sec); } -BlockQueueStatus_T GpuBufferMgr::Front(unsigned int handle, void **feature_addr, size_t *feature_size, - void **label_addr, size_t *label_size) { +BlockQueueStatus_T GpuBufferMgr::Front(unsigned int handle, void **addr, size_t *len) { auto iter = handle_queue_map_.find(handle); if (iter == handle_queue_map_.end()) { return HANDLE_NOT_EXIST; } - return iter->second->Front(feature_addr, feature_size, label_addr, label_size); + return iter->second->Front(addr, len); } BlockQueueStatus_T GpuBufferMgr::Pop(unsigned int handle) { diff --git a/mindspore/ccsrc/device/gpu/gpu_buffer_mgr.h b/mindspore/ccsrc/device/gpu/gpu_buffer_mgr.h index 447564914a1..bcc01a6988d 100644 --- a/mindspore/ccsrc/device/gpu/gpu_buffer_mgr.h +++ b/mindspore/ccsrc/device/gpu/gpu_buffer_mgr.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include "device/gpu/blocking_queue.h" @@ -80,20 +81,18 @@ class GpuBufferMgr { EXPORT static GpuBufferMgr &GetInstance() noexcept; EXPORT BlockQueueStatus_T Create(unsigned int device_id, const std::string &channel_name, void *addr, - const size_t &feature_len, const size_t &label_size, const size_t &capacity); + const std::vector &shape, const size_t &capacity); // call for Push thread - EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const size_t &feature_len, - const size_t &label_size, std::function func); + EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const std::vector &shape, + std::function func); // call for Front/Pop thread - EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const size_t &feature_len, - const size_t &label_size); + EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const std::vector &shape); - EXPORT BlockQueueStatus_T Push(unsigned int handle, void *feature_addr, size_t feature_size, void *label_addr, - size_t label_size, unsigned int timeout_in_sec); - EXPORT BlockQueueStatus_T Front(unsigned int handle, void **feature_addr, size_t *feature_size, void **label_addr, - size_t *label_size); + EXPORT BlockQueueStatus_T Push(unsigned int handle, const std::vector &data, + unsigned int timeout_in_sec); + EXPORT BlockQueueStatus_T Front(unsigned int handle, void **addr, size_t *len); EXPORT BlockQueueStatus_T Pop(unsigned int handle); EXPORT void set_device_id(int device_id); diff --git a/mindspore/ccsrc/kernel/gpu/data/dataset_init_kernel.cc b/mindspore/ccsrc/kernel/gpu/data/dataset_init_kernel.cc index bdf56d7827d..d87a6cc7261 100644 --- a/mindspore/ccsrc/kernel/gpu/data/dataset_init_kernel.cc +++ b/mindspore/ccsrc/kernel/gpu/data/dataset_init_kernel.cc @@ -15,6 +15,7 @@ */ #include "kernel/gpu/data/dataset_init_kernel.h" +#include "kernel/gpu/data/dataset_utils.h" #include "device/gpu/gpu_buffer_mgr.h" #include "device/gpu/gpu_memory_allocator.h" #include "utils/convert_utils.h" @@ -23,7 +24,7 @@ namespace mindspore { namespace kernel { using mindspore::device::GpuBufferMgr; -DatasetInitKernel::DatasetInitKernel() : feature_size_(0), label_size_(0) {} +DatasetInitKernel::DatasetInitKernel() : total_bytes_(0) {} const std::vector &DatasetInitKernel::GetInputSizeList() const { return input_size_list_; } @@ -31,39 +32,21 @@ const std::vector &DatasetInitKernel::GetOutputSizeList() const { return const std::vector &DatasetInitKernel::GetWorkspaceSizeList() const { return workspace_size_list_; } -size_t DatasetInitKernel::TensorSize(std::vector &shape) const { - if (shape.size() == 0) { - return 0; - } - - int size = 1; - for (size_t i = 0; i < shape.size(); i++) { - size *= shape[i]; - } - - return IntToSize(size); -} - bool DatasetInitKernel::Init(const CNodePtr &kernel_node) { queue_name_ = GetAttr(kernel_node, "queue_name"); auto shapes = GetAttr>>(kernel_node, "shapes"); - auto data_num = shapes.size(); - if (data_num != 2) { - MS_LOG(EXCEPTION) << "Invalid Shapes " << data_num; - } - - auto &feature_Shapes = shapes[0]; - auto size = TensorSize(feature_Shapes); - feature_size_ = size * sizeof(float); - auto types = GetAttr>(kernel_node, "types"); - if ((types[1]->type_id() != kNumberTypeInt32) && (types[1]->type_id() != kNumberTypeInt64)) { - MS_LOG(EXCEPTION) << "Invalid types " << types[1]->type_id(); + if (shapes.size() != types.size()) { + MS_LOG(EXCEPTION) << "Invalid shapes: " << shapes << ", types: " << types; } - size_t label_unit = (types[1]->type_id() == kNumberTypeInt32) ? sizeof(int32_t) : sizeof(int64_t); - size = TensorSize(shapes[1]); - label_size_ = size * label_unit; + for (size_t i = 0; i < shapes.size(); i++) { + int unit = UnitSizeInBytes(types[i]->type_id()); + int nums = ElementNums(shapes[i]); + int bytes = unit * nums; + shapes_.push_back(bytes); + total_bytes_ += bytes; + } return true; } @@ -72,17 +55,15 @@ void DatasetInitKernel::InitSizeLists() { return; } bool DatasetInitKernel::Launch(const std::vector &, const std::vector &, const std::vector &, uintptr_t) { void *addr = nullptr; - size_t len = (feature_size_ + label_size_) * buffer_q_capacity_; + size_t len = total_bytes_ * buffer_q_capacity_; if (!device::gpu::GPUMemoryAllocator::GetInstance().AllocBufferQueueMem(len, &addr)) { MS_LOG(EXCEPTION) << "Memory not enough: failed to allocate GPU buffer queue memory[" << len << "]."; } - auto status = - GpuBufferMgr::GetInstance().Create(0, queue_name_, addr, feature_size_, label_size_, buffer_q_capacity_); + auto status = GpuBufferMgr::GetInstance().Create(0, queue_name_, addr, shapes_, buffer_q_capacity_); if (status) { - MS_LOG(EXCEPTION) << "Init Dataset Failed: " << queue_name_ << ", " << feature_size_ << ", " << label_size_ << ", " - << status; + MS_LOG(EXCEPTION) << "Init Dataset Failed. len: " << len << ", status:" << status; } return true; diff --git a/mindspore/ccsrc/kernel/gpu/data/dataset_init_kernel.h b/mindspore/ccsrc/kernel/gpu/data/dataset_init_kernel.h index 7750fb8da6d..f7ffb419956 100644 --- a/mindspore/ccsrc/kernel/gpu/data/dataset_init_kernel.h +++ b/mindspore/ccsrc/kernel/gpu/data/dataset_init_kernel.h @@ -40,11 +40,9 @@ class DatasetInitKernel : public GpuKernel { void InitSizeLists() override; private: - size_t TensorSize(std::vector &) const; - std::string queue_name_; - size_t feature_size_; - size_t label_size_; + std::vector shapes_; + size_t total_bytes_; std::vector input_size_list_; std::vector output_size_list_; diff --git a/mindspore/ccsrc/kernel/gpu/data/dataset_iterator_kernel.cc b/mindspore/ccsrc/kernel/gpu/data/dataset_iterator_kernel.cc index 342bfb98e3f..f8ee1340189 100644 --- a/mindspore/ccsrc/kernel/gpu/data/dataset_iterator_kernel.cc +++ b/mindspore/ccsrc/kernel/gpu/data/dataset_iterator_kernel.cc @@ -15,21 +15,19 @@ */ #include "kernel/gpu/data/dataset_iterator_kernel.h" - #include #include #include - #include "device/gpu/gpu_buffer_mgr.h" #include "device/gpu/gpu_common.h" +#include "kernel/gpu/data/dataset_utils.h" namespace mindspore { namespace kernel { using mindspore::device::GpuBufferMgr; using mindspore::device::HandleMgr; -DatasetIteratorKernel::DatasetIteratorKernel() - : output_num_(0), handle_(HandleMgr::INVALID_HANDLE), feature_size_(0), label_size_(0) {} +DatasetIteratorKernel::DatasetIteratorKernel() : handle_(HandleMgr::INVALID_HANDLE), total_bytes_(0) {} DatasetIteratorKernel::~DatasetIteratorKernel() { GpuBufferMgr::GetInstance().Close(handle_); } @@ -39,65 +37,40 @@ const std::vector &DatasetIteratorKernel::GetOutputSizeList() const { re const std::vector &DatasetIteratorKernel::GetWorkspaceSizeList() const { return workspace_size_list_; } -size_t DatasetIteratorKernel::TensorSize(std::vector &shape) const { - if (shape.size() == 0) { - return 0; - } - - int size = 1; - for (size_t i = 0; i < shape.size(); i++) { - size *= shape[i]; - } - - return IntToSize(size); -} - bool DatasetIteratorKernel::Init(const CNodePtr &kernel_node) { - output_num_ = GetAttr(kernel_node, "output_num"); queue_name_ = GetAttr(kernel_node, "shared_name"); auto shapes = GetAttr>>(kernel_node, "shapes"); - auto data_num = shapes.size(); - if (data_num != 2) { - MS_LOG(EXCEPTION) << "Invalid Shapes " << data_num; - } - - auto &feature_Shapes = shapes[0]; - auto size = TensorSize(feature_Shapes); - feature_size_ = size * sizeof(float); - auto types = GetAttr>(kernel_node, "types"); - if ((types[1]->type_id() != kNumberTypeInt32) && (types[1]->type_id() != kNumberTypeInt64)) { - MS_LOG(EXCEPTION) << "Invalid types " << types[1]->type_id(); + if (shapes.size() != types.size()) { + MS_LOG(EXCEPTION) << "Invalid shapes: " << shapes << ", types: " << types; } - size_t label_unit = (types[1]->type_id() == kNumberTypeInt32) ? sizeof(int32_t) : sizeof(int64_t); - size = TensorSize(shapes[1]); - label_size_ = size * label_unit; + for (size_t i = 0; i < shapes.size(); i++) { + int unit = UnitSizeInBytes(types[i]->type_id()); + int nums = ElementNums(shapes[i]); + int bytes = unit * nums; + output_size_list_.push_back(bytes); + total_bytes_ += bytes; + } - InitSizeLists(); - - handle_ = GpuBufferMgr::GetInstance().Open(0, queue_name_, feature_size_, label_size_); + handle_ = GpuBufferMgr::GetInstance().Open(0, queue_name_, output_size_list_); if (handle_ == HandleMgr::INVALID_HANDLE) { - MS_LOG(EXCEPTION) << "Gpu Queue(" << queue_name_ << ") Open Failed: feature_size(" << feature_size_ - << "), label_size(" << label_size_ << ")"; + MS_LOG(EXCEPTION) << "Gpu Queue(" << queue_name_ << ") Open Failed"; } return true; } -void DatasetIteratorKernel::InitSizeLists() { - output_size_list_.push_back(feature_size_); - output_size_list_.push_back(label_size_); -} +void DatasetIteratorKernel::InitSizeLists() { return; } bool DatasetIteratorKernel::Launch(const std::vector &, const std::vector &, const std::vector &outputs, uintptr_t) { - void *feature_addr{nullptr}, *label_addr{nullptr}; - size_t feature_size{0}, label_size{0}; + void *addr = nullptr; + size_t len = 0; int repeat = 0; while (true) { - auto ret = GpuBufferMgr::GetInstance().Front(handle_, &feature_addr, &feature_size, &label_addr, &label_size); + auto ret = GpuBufferMgr::GetInstance().Front(handle_, &addr, &len); if (ret == device::SUCCESS) { break; } @@ -117,19 +90,18 @@ bool DatasetIteratorKernel::Launch(const std::vector &, const std::v return false; } - if (feature_size != feature_size_ || label_size != label_size_) { - MS_LOG(ERROR) << "DatasetIteratorKernel: Front Error: " << feature_addr << ", " << feature_size << ", " - << label_addr << ", " << label_size; + if (total_bytes_ != len) { + MS_LOG(ERROR) << "Dataset front error. read: " << len << ", expect: " << total_bytes_ << ", "; return false; } - CHECK_CUDA_RET_WITH_EXCEPT(cudaMemcpy(outputs[0]->addr, feature_addr, feature_size, cudaMemcpyDeviceToDevice), - "Cuda Memcpy Failed"); - CHECK_CUDA_RET_WITH_EXCEPT(cudaMemcpy(outputs[1]->addr, label_addr, label_size, cudaMemcpyDeviceToDevice), - "Cuda Memcpy Failed"); + for (size_t i = 0; i < output_size_list_.size(); i++) { + CHECK_CUDA_RET_WITH_EXCEPT(cudaMemcpy(outputs[i]->addr, addr, output_size_list_[i], cudaMemcpyDeviceToDevice), + "Cuda Memcpy Failed"); + addr = reinterpret_cast(addr) + output_size_list_[i]; + } (void)GpuBufferMgr::GetInstance().Pop(handle_); - return true; } } // namespace kernel diff --git a/mindspore/ccsrc/kernel/gpu/data/dataset_iterator_kernel.h b/mindspore/ccsrc/kernel/gpu/data/dataset_iterator_kernel.h index 5819a87b936..d3231cab3cf 100644 --- a/mindspore/ccsrc/kernel/gpu/data/dataset_iterator_kernel.h +++ b/mindspore/ccsrc/kernel/gpu/data/dataset_iterator_kernel.h @@ -40,14 +40,9 @@ class DatasetIteratorKernel : public GpuKernel { void InitSizeLists() override; private: - size_t TensorSize(std::vector &) const; - std::string queue_name_; - int output_num_; unsigned int handle_; - - size_t feature_size_; - size_t label_size_; + size_t total_bytes_; std::vector input_size_list_; std::vector output_size_list_; diff --git a/mindspore/ccsrc/kernel/gpu/data/dataset_utils.cc b/mindspore/ccsrc/kernel/gpu/data/dataset_utils.cc new file mode 100644 index 00000000000..42c322069ff --- /dev/null +++ b/mindspore/ccsrc/kernel/gpu/data/dataset_utils.cc @@ -0,0 +1,70 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "kernel/gpu/data/dataset_utils.h" + +namespace mindspore { +namespace kernel { + +size_t UnitSizeInBytes(const mindspore::TypeId &t) { + size_t bytes = 0; + switch (t) { + case kNumberTypeBool: + case kNumberTypeInt8: + case kNumberTypeUInt8: + bytes = 1; + break; + case kNumberTypeInt16: + case kNumberTypeUInt16: + case kNumberTypeFloat16: + bytes = 2; + break; + case kNumberTypeInt: + case kNumberTypeUInt: + case kNumberTypeInt32: + case kNumberTypeUInt32: + case kNumberTypeFloat: + case kNumberTypeFloat32: + bytes = 4; + break; + case kNumberTypeUInt64: + case kNumberTypeInt64: + case kNumberTypeFloat64: + bytes = 8; + break; + default: + MS_LOG(EXCEPTION) << "Invalid types " << t; + break; + } + + return bytes; +} + +int ElementNums(const std::vector &shape) { + if (shape.size() == 0) { + return 0; + } + + int nums = 1; + for (size_t i = 0; i < shape.size(); i++) { + nums *= shape[i]; + } + + return nums; +} + +} // namespace kernel +} // namespace mindspore diff --git a/mindspore/ccsrc/kernel/gpu/data/dataset_utils.h b/mindspore/ccsrc/kernel/gpu/data/dataset_utils.h new file mode 100644 index 00000000000..7fe7c7ee1d2 --- /dev/null +++ b/mindspore/ccsrc/kernel/gpu/data/dataset_utils.h @@ -0,0 +1,29 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_KERNEL_GPU_DATASET_UTILS_KERNEL_H_ +#define MINDSPORE_CCSRC_KERNEL_GPU_DATASET_UTILS_KERNEL_H_ + +#include +#include "ir/dtype/type.h" +namespace mindspore { +namespace kernel { + +size_t UnitSizeInBytes(const mindspore::TypeId &t); +int ElementNums(const std::vector &shape); +} // namespace kernel +} // namespace mindspore +#endif // MINDSPORE_CCSRC_KERNEL_GPU_DATASET_UTILS_KERNEL_H_