gpu queue support multi-inputs

This commit is contained in:
wilfChen 2020-05-11 20:19:47 +08:00
parent 4a8fcf5d76
commit ccf6dabe13
12 changed files with 245 additions and 218 deletions

View File

@ -141,19 +141,19 @@ Status DeviceQueueOp::SendDataToGPU() {
for (int row_id = 0; for (int row_id = 0;
row_id < current_buffer->NumRows() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed(); row_id++) { row_id < current_buffer->NumRows() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed(); row_id++) {
RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &curr_row)); RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &curr_row));
if (curr_row.size() < 2) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Invalid tensor size"); std::vector<size_t> data_size;
for (int i = 0; i < curr_row.size(); i++) {
data_size.push_back(static_cast<size_t>(curr_row[i]->SizeInBytes()));
} }
uint32_t feature_size = static_cast<uint32_t>(curr_row[0]->SizeInBytes());
uint32_t label_size = static_cast<uint32_t>(curr_row[1]->SizeInBytes());
if (!is_open) { if (!is_open) {
handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, feature_size, label_size, ReleaseData); handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, ReleaseData);
if (handle == INVALID_HANDLE) { if (handle == INVALID_HANDLE) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "open failed"); return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "open failed");
} }
is_open = true; is_open = true;
} }
RETURN_IF_NOT_OK(RetryPushGPUData(feature_size, label_size, curr_row, handle)); RETURN_IF_NOT_OK(RetryPushGPUData(data_size, curr_row, handle));
total_batch++; total_batch++;
if (num_batch_ > 0 && total_batch == num_batch_) { if (num_batch_ > 0 && total_batch == num_batch_) {
is_break_loop = true; is_break_loop = true;
@ -173,16 +173,23 @@ Status DeviceQueueOp::SendDataToGPU() {
return Status::OK(); return Status::OK();
} }
Status DeviceQueueOp::RetryPushGPUData(uint32_t feature_size, uint32_t label_size, const TensorRow &curr_row, Status DeviceQueueOp::RetryPushGPUData(const std::vector<size_t> &data_size, const TensorRow &curr_row,
uint32_t handle) { uint32_t handle) {
unsigned char *feature_addr = nullptr; std::vector<device::DataItemGpu> items;
unsigned char *label_addr = nullptr; for (int i = 0; i < data_size.size(); i++) {
while (true && !GpuBufferMgr::GetInstance().IsClosed()) { device::DataItemGpu data_item;
RETURN_IF_NOT_OK(MallocForGPUData(&feature_addr, feature_size, &label_addr, label_size, curr_row)); data_item.data_len_ = data_size[i];
auto ret = GpuBufferMgr::GetInstance().Push(handle, feature_addr, feature_size, label_addr, label_size, WAIT_TIME); data_item.data_ptr_ = nullptr;
items.push_back(data_item);
}
while (!GpuBufferMgr::GetInstance().IsClosed()) {
RETURN_IF_NOT_OK(MallocForGPUData(&items, curr_row));
auto ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME);
if (ret) { if (ret) {
free(feature_addr); for (int i = 0; i < items.size(); i++) {
free(label_addr); free(items[i].data_ptr_);
}
MS_LOG(WARNING) << "Retry pushing data..."; MS_LOG(WARNING) << "Retry pushing data...";
continue; continue;
} else { } else {
@ -192,29 +199,20 @@ Status DeviceQueueOp::RetryPushGPUData(uint32_t feature_size, uint32_t label_siz
return Status::OK(); return Status::OK();
} }
Status DeviceQueueOp::MallocForGPUData(unsigned char **feature_addr, uint32_t feature_size, unsigned char **label_addr, Status DeviceQueueOp::MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row) {
uint32_t label_size, const TensorRow &curr_row) { int i = 0;
*feature_addr = (unsigned char *)malloc(feature_size); for (auto &sub_item : *items) {
if (*feature_addr == nullptr) { sub_item.data_ptr_ = (unsigned char *)malloc(sub_item.data_len_);
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "feature memory malloc failed."); if (sub_item.data_ptr_ == nullptr) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "memory malloc failed.");
} }
(void)memset_s(*feature_addr, feature_size, 0, feature_size); (void)memset_s(sub_item.data_ptr_, sub_item.data_len_, 0, sub_item.data_len_);
unsigned char *feature = curr_row[0]->StartAddr(); unsigned char *column_data = curr_row[i]->StartAddr();
if (memcpy_s(*feature_addr, feature_size, feature, static_cast<uint32_t>(curr_row[0]->SizeInBytes())) != 0) { if (memcpy_s(sub_item.data_ptr_, sub_item.data_len_, column_data,
MS_LOG(ERROR) << "Feature memcpy_s failed!"; static_cast<uint32_t>(curr_row[i++]->SizeInBytes())) != 0) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "feature memcpy_s failed."); MS_LOG(ERROR) << "memcpy_s failed!";
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "memcpy_s failed.");
} }
*label_addr = (unsigned char *)malloc(label_size);
if (*label_addr == nullptr) {
free(*feature_addr);
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "label memory malloc failed.");
}
(void)memset_s(*label_addr, label_size, 0, label_size);
unsigned char *label = curr_row[1]->StartAddr();
if (memcpy_s(*label_addr, label_size, label, static_cast<uint32_t>(curr_row[1]->SizeInBytes())) != 0) {
MS_LOG(ERROR) << "Label memcpy_s failed!";
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "label memcpy_s failed.");
} }
return Status::OK(); return Status::OK();

View File

@ -145,9 +145,8 @@ class DeviceQueueOp : public PipelineOp {
#ifdef ENABLE_GPUQUE #ifdef ENABLE_GPUQUE
Status SendDataToGPU(); Status SendDataToGPU();
Status RetryPushGPUData(uint32_t feature_size, uint32_t label_size, const TensorRow &curr_row, uint32_t handle); Status RetryPushGPUData(const std::vector<size_t> &data_size, const TensorRow &curr_row, uint32_t handle);
Status MallocForGPUData(unsigned char **feature_addr, uint32_t feature_size, unsigned char **label_addr, Status MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row);
uint32_t label_size, const TensorRow &curr_row);
#endif #endif
Status SendDataToCPU(); Status SendDataToCPU();

View File

@ -21,64 +21,49 @@
namespace mindspore { namespace mindspore {
namespace device { namespace device {
GpuQueue::GpuQueue(void *addr, size_t feature_size, size_t label_size, size_t capacity) GpuQueue::GpuQueue(void *addr, const std::vector<size_t> &shape, const size_t &capacity)
: buffer_(addr), : buffer_(addr), head_(0), tail_(0), shape_(shape), len_(0), capacity_(capacity), stream_(0), node_info_(nullptr) {
head_(0),
tail_(0),
feature_size_(feature_size),
label_size_(label_size),
capacity_(capacity),
stream_(0),
node_info_(nullptr) {
CHECK_CUDA_RET_WITH_ERROR(cudaStreamCreate(&stream_), "Cuda Create Stream Failed"); CHECK_CUDA_RET_WITH_ERROR(cudaStreamCreate(&stream_), "Cuda Create Stream Failed");
node_info_ = std::make_unique<NodeInfo[]>(capacity); node_info_ = std::make_unique<NodeInfo[]>(capacity);
for (auto item : shape) {
len_ += item;
}
} }
GpuQueue::~GpuQueue() { buffer_ = nullptr; } GpuQueue::~GpuQueue() { buffer_ = nullptr; }
BlockQueueStatus_T GpuQueue::Push(void *feature_addr, size_t feature_size, void *label_addr, size_t label_size) { BlockQueueStatus_T GpuQueue::Push(const std::vector<DataItemGpu> &data) {
if ((feature_addr == nullptr) || (label_addr == nullptr)) { int offset = 0;
MS_LOG(ERROR) << "input nullptr"; for (size_t i = 0; i < data.size(); i++) {
auto item = data[i];
if (item.data_ptr_ == nullptr || item.data_len_ != shape_[i]) {
MS_LOG(ERROR) << "Invalid Input: ptr: " << item.data_ptr_ << ", len: " << item.data_len_;
return ERROR_INPUT; return ERROR_INPUT;
} }
if ((feature_size != feature_size_) || (label_size != label_size_)) {
MS_LOG(ERROR) << "Data input error. Input data size: (" << feature_size << ", " << label_size << "), with (" void *addr = reinterpret_cast<unsigned char *>(buffer_) + tail_ * len_ + offset;
<< feature_size_ << ", " << label_size_ << ") expect"; CHECK_CUDA_RET_WITH_ERROR(cudaMemcpyAsync(addr, item.data_ptr_, item.data_len_, cudaMemcpyHostToDevice, stream_),
return ERROR_INPUT;
}
void *feature_start_addr = reinterpret_cast<unsigned char *>(buffer_) + tail_ * (feature_size + label_size);
if (feature_start_addr == nullptr) {
MS_LOG(ERROR) << "feature start addr is nullptr";
return INTERNAL_ERROR;
}
CHECK_CUDA_RET_WITH_ERROR(
cudaMemcpyAsync(feature_start_addr, feature_addr, feature_size, cudaMemcpyHostToDevice, stream_),
"Cuda Memcpy Error"); "Cuda Memcpy Error");
void *label_start_addr = reinterpret_cast<unsigned char *>(feature_start_addr) + feature_size;
if (label_start_addr == nullptr) { offset += item.data_len_;
MS_LOG(ERROR) << "label start addr is nullptr";
return INTERNAL_ERROR;
} }
CHECK_CUDA_RET_WITH_ERROR(cudaMemcpyAsync(label_start_addr, label_addr, label_size, cudaMemcpyHostToDevice, stream_),
"Cuda Memcpy Error");
node_info_[tail_].event_.reset(new cudaEvent_t()); node_info_[tail_].event_.reset(new cudaEvent_t());
CHECK_CUDA_RET_WITH_ERROR(cudaEventCreate(&(*(node_info_[tail_].event_))), "Cuda Create Event Failed"); CHECK_CUDA_RET_WITH_ERROR(cudaEventCreate(&(*(node_info_[tail_].event_))), "Cuda Create Event Failed");
node_info_[tail_].host_feature_addr_ = feature_addr; node_info_[tail_].data_ = data;
node_info_[tail_].host_label_addr_ = label_addr;
tail_ = (tail_ + 1) % (capacity_); tail_ = (tail_ + 1) % (capacity_);
return SUCCESS; return SUCCESS;
} }
BlockQueueStatus_T GpuQueue::Front(void **feature_addr, size_t *feature_size, void **label_addr, BlockQueueStatus_T GpuQueue::Front(void **addr, size_t *len) const {
size_t *label_size) const {
CHECK_CUDA_RET_WITH_ERROR(cudaEventSynchronize(*(node_info_[head_].event_)), "Cuda Event Syn Failed"); CHECK_CUDA_RET_WITH_ERROR(cudaEventSynchronize(*(node_info_[head_].event_)), "Cuda Event Syn Failed");
CHECK_CUDA_RET_WITH_ERROR(cudaEventDestroy(*(node_info_[head_].event_)), "Cuda Destroy Event Failed"); CHECK_CUDA_RET_WITH_ERROR(cudaEventDestroy(*(node_info_[head_].event_)), "Cuda Destroy Event Failed");
*feature_addr = (unsigned char *)buffer_ + head_ * (feature_size_ + label_size_); *addr = (unsigned char *)buffer_ + head_ * len_;
*feature_size = feature_size_; *len = len_;
*label_addr = (unsigned char *)buffer_ + head_ * (feature_size_ + label_size_) + feature_size_;
*label_size = label_size_; for (auto item : node_info_[head_].data_) {
host_release_(node_info_[head_].host_feature_addr_); host_release_(item.data_ptr_);
host_release_(node_info_[head_].host_label_addr_); }
return SUCCESS; return SUCCESS;
} }
@ -100,26 +85,25 @@ bool GpuQueue::Destroy() {
} }
} }
BlockQueueStatus_T BlockingQueue::Create(void *addr, size_t feature_size, size_t label_size, size_t capacity) { BlockQueueStatus_T BlockingQueue::Create(void *addr, const std::vector<size_t> &shape, const size_t &capacity) {
if (addr == nullptr) { if (addr == nullptr) {
MS_LOG(ERROR) << "addr is nullptr"; MS_LOG(ERROR) << "addr is nullptr";
return INTERNAL_ERROR; return INTERNAL_ERROR;
} }
queue_ = std::make_shared<GpuQueue>(addr, feature_size, label_size, capacity); queue_ = std::make_shared<GpuQueue>(addr, shape, capacity);
return SUCCESS; return SUCCESS;
} }
void BlockingQueue::RegisterRelease(const std::function<void(void *)> &func) { queue_->RegisterRelease(func); } void BlockingQueue::RegisterRelease(const std::function<void(void *)> &func) { queue_->RegisterRelease(func); }
BlockQueueStatus_T BlockingQueue::Push(void *feature_addr, size_t feature_size, void *label_addr, size_t label_size, BlockQueueStatus_T BlockingQueue::Push(const std::vector<DataItemGpu> &data, unsigned int timeout_in_sec) {
unsigned int timeout_in_sec) {
std::unique_lock<std::mutex> locker(mutex_); std::unique_lock<std::mutex> locker(mutex_);
if (queue_->IsFull()) { if (queue_->IsFull()) {
if (not_full_cond_.wait_for(locker, std::chrono::seconds(timeout_in_sec)) == std::cv_status::timeout) { if (not_full_cond_.wait_for(locker, std::chrono::seconds(timeout_in_sec)) == std::cv_status::timeout) {
return TIMEOUT; return TIMEOUT;
} }
} }
auto ret = queue_->Push(feature_addr, feature_size, label_addr, label_size); auto ret = queue_->Push(data);
if (ret) { if (ret) {
return ret; return ret;
} }
@ -127,15 +111,14 @@ BlockQueueStatus_T BlockingQueue::Push(void *feature_addr, size_t feature_size,
return SUCCESS; return SUCCESS;
} }
BlockQueueStatus_T BlockingQueue::Front(void **feature_addr, size_t *feature_size, void **label_addr, BlockQueueStatus_T BlockingQueue::Front(void **addr, size_t *len) {
size_t *label_size) {
std::unique_lock<std::mutex> locker(mutex_); std::unique_lock<std::mutex> locker(mutex_);
bool timeout = not_empty_cond_.wait_for(locker, std::chrono::seconds(30), [this] { return !queue_->IsEmpty(); }); bool timeout = not_empty_cond_.wait_for(locker, std::chrono::seconds(30), [this] { return !queue_->IsEmpty(); });
if (!timeout) { if (!timeout) {
return TIMEOUT; return TIMEOUT;
} }
return queue_->Front(feature_addr, feature_size, label_addr, label_size); return queue_->Front(addr, len);
} }
BlockQueueStatus_T BlockingQueue::Pop() { BlockQueueStatus_T BlockingQueue::Pop() {

View File

@ -24,6 +24,7 @@
#include <mutex> #include <mutex>
#include <cstring> #include <cstring>
#include <string> #include <string>
#include <vector>
#include <condition_variable> #include <condition_variable>
#include <functional> #include <functional>
@ -31,9 +32,14 @@ namespace mindspore {
namespace device { namespace device {
enum BlockQueueStatus_T : int { SUCCESS = 0, QUEUE_NOT_EXIST, HANDLE_NOT_EXIST, ERROR_INPUT, INTERNAL_ERROR, TIMEOUT }; enum BlockQueueStatus_T : int { SUCCESS = 0, QUEUE_NOT_EXIST, HANDLE_NOT_EXIST, ERROR_INPUT, INTERNAL_ERROR, TIMEOUT };
struct DataItemGpu {
size_t data_len_;
void *data_ptr_;
};
class GpuQueue { class GpuQueue {
public: public:
GpuQueue(void *addr, size_t feature_size, size_t label_size, size_t capacity); GpuQueue(void *addr, const std::vector<size_t> &shape, const size_t &capacity);
virtual ~GpuQueue(); virtual ~GpuQueue();
void RegisterRelease(const std::function<void(void *)> &func) { host_release_ = func; } void RegisterRelease(const std::function<void(void *)> &func) { host_release_ = func; }
@ -41,23 +47,22 @@ class GpuQueue {
inline bool IsEmpty() const { return head_ == tail_; } inline bool IsEmpty() const { return head_ == tail_; }
inline bool IsFull() const { return head_ == ((tail_ + 1) % (capacity_)); } inline bool IsFull() const { return head_ == ((tail_ + 1) % (capacity_)); }
BlockQueueStatus_T Push(void *feature_addr, size_t feature_size, void *label_addr, size_t label_size); BlockQueueStatus_T Push(const std::vector<DataItemGpu> &data);
BlockQueueStatus_T Front(void **feature_addr, size_t *feature_size, void **label_addr, size_t *label_size) const; BlockQueueStatus_T Front(void **ptr, size_t *len) const;
BlockQueueStatus_T Pop(); BlockQueueStatus_T Pop();
bool Destroy(); bool Destroy();
private: private:
struct NodeInfo { struct NodeInfo {
std::unique_ptr<cudaEvent_t> event_; std::unique_ptr<cudaEvent_t> event_;
void *host_feature_addr_; std::vector<DataItemGpu> data_;
void *host_label_addr_;
}; };
void *buffer_; void *buffer_;
size_t head_; size_t head_;
size_t tail_; size_t tail_;
size_t feature_size_; std::vector<size_t> shape_;
size_t label_size_; size_t len_;
size_t capacity_; size_t capacity_;
cudaStream_t stream_; cudaStream_t stream_;
std::unique_ptr<NodeInfo[]> node_info_; std::unique_ptr<NodeInfo[]> node_info_;
@ -72,11 +77,10 @@ class BlockingQueue {
BlockingQueue() : queue_(nullptr) {} BlockingQueue() : queue_(nullptr) {}
~BlockingQueue() = default; ~BlockingQueue() = default;
BlockQueueStatus_T Create(void *addr, size_t feature_size, size_t label_size, size_t capacity); BlockQueueStatus_T Create(void *addr, const std::vector<size_t> &shape, const size_t &capacity);
void RegisterRelease(const std::function<void(void *)> &func); void RegisterRelease(const std::function<void(void *)> &func);
BlockQueueStatus_T Push(void *feature_addr, size_t feature_size, void *label_addr, size_t label_size, BlockQueueStatus_T Push(const std::vector<DataItemGpu> &data, unsigned int timeout_in_sec);
unsigned int timeout_in_sec); BlockQueueStatus_T Front(void **ptr, size_t *len);
BlockQueueStatus_T Front(void **feature_addr, size_t *feature_size, void **label_addr, size_t *label_size);
BlockQueueStatus_T Pop(); BlockQueueStatus_T Pop();
bool Destroy(); bool Destroy();

View File

@ -45,14 +45,14 @@ GpuBufferMgr &GpuBufferMgr::GetInstance() noexcept {
} }
BlockQueueStatus_T GpuBufferMgr::Create(unsigned int device_id, const std::string &channel_name, void *addr, BlockQueueStatus_T GpuBufferMgr::Create(unsigned int device_id, const std::string &channel_name, void *addr,
const size_t &feature_len, const size_t &label_size, const size_t &capacity) { const std::vector<size_t> &shape, const size_t &capacity) {
std::string name = std::to_string(device_id) + std::string("_") + channel_name; std::string name = std::to_string(device_id) + std::string("_") + channel_name;
if (name_queue_map_.count(name)) { if (name_queue_map_.count(name)) {
MS_LOG(ERROR) << "Queue not exist " << name; MS_LOG(ERROR) << "Queue not exist " << name;
return QUEUE_NOT_EXIST; return QUEUE_NOT_EXIST;
} }
std::shared_ptr<BlockingQueue> queue = std::make_shared<BlockingQueue>(); std::shared_ptr<BlockingQueue> queue = std::make_shared<BlockingQueue>();
BlockQueueStatus_T rt = queue->Create(addr, feature_len, label_size, capacity); BlockQueueStatus_T rt = queue->Create(addr, shape, capacity);
if (rt != SUCCESS) { if (rt != SUCCESS) {
return rt; return rt;
} }
@ -61,8 +61,8 @@ BlockQueueStatus_T GpuBufferMgr::Create(unsigned int device_id, const std::strin
return SUCCESS; return SUCCESS;
} }
unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name, const size_t &, const size_t &, unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name,
const std::function<void(void *)> func) { const std::vector<size_t> &shape, const std::function<void(void *)> func) {
set_device(); set_device();
std::string name = std::to_string(device_id) + std::string("_") + channel_name; std::string name = std::to_string(device_id) + std::string("_") + channel_name;
if (!name_queue_map_.count(name)) { if (!name_queue_map_.count(name)) {
@ -80,8 +80,8 @@ unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &chann
return handle; return handle;
} }
unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name, const size_t &, unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name,
const size_t &) { const std::vector<size_t> &shape) {
set_device(); set_device();
std::string name = std::to_string(device_id) + std::string("_") + channel_name; std::string name = std::to_string(device_id) + std::string("_") + channel_name;
if (!name_queue_map_.count(name)) { if (!name_queue_map_.count(name)) {
@ -106,22 +106,21 @@ void GpuBufferMgr::set_device() const {
} }
} }
BlockQueueStatus_T GpuBufferMgr::Push(unsigned int handle, void *feature_addr, size_t feature_size, void *label_addr, BlockQueueStatus_T GpuBufferMgr::Push(unsigned int handle, const std::vector<DataItemGpu> &data,
size_t label_size, unsigned int timeout_in_sec) { unsigned int timeout_in_sec) {
auto iter = handle_queue_map_.find(handle); auto iter = handle_queue_map_.find(handle);
if (iter == handle_queue_map_.end()) { if (iter == handle_queue_map_.end()) {
return HANDLE_NOT_EXIST; return HANDLE_NOT_EXIST;
} }
return iter->second->Push(feature_addr, feature_size, label_addr, label_size, timeout_in_sec); return iter->second->Push(data, timeout_in_sec);
} }
BlockQueueStatus_T GpuBufferMgr::Front(unsigned int handle, void **feature_addr, size_t *feature_size, BlockQueueStatus_T GpuBufferMgr::Front(unsigned int handle, void **addr, size_t *len) {
void **label_addr, size_t *label_size) {
auto iter = handle_queue_map_.find(handle); auto iter = handle_queue_map_.find(handle);
if (iter == handle_queue_map_.end()) { if (iter == handle_queue_map_.end()) {
return HANDLE_NOT_EXIST; return HANDLE_NOT_EXIST;
} }
return iter->second->Front(feature_addr, feature_size, label_addr, label_size); return iter->second->Front(addr, len);
} }
BlockQueueStatus_T GpuBufferMgr::Pop(unsigned int handle) { BlockQueueStatus_T GpuBufferMgr::Pop(unsigned int handle) {

View File

@ -22,6 +22,7 @@
#include <iostream> #include <iostream>
#include <functional> #include <functional>
#include <map> #include <map>
#include <vector>
#include <string> #include <string>
#include <memory> #include <memory>
#include "device/gpu/blocking_queue.h" #include "device/gpu/blocking_queue.h"
@ -80,20 +81,18 @@ class GpuBufferMgr {
EXPORT static GpuBufferMgr &GetInstance() noexcept; EXPORT static GpuBufferMgr &GetInstance() noexcept;
EXPORT BlockQueueStatus_T Create(unsigned int device_id, const std::string &channel_name, void *addr, EXPORT BlockQueueStatus_T Create(unsigned int device_id, const std::string &channel_name, void *addr,
const size_t &feature_len, const size_t &label_size, const size_t &capacity); const std::vector<size_t> &shape, const size_t &capacity);
// call for Push thread // call for Push thread
EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const size_t &feature_len, EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const std::vector<size_t> &shape,
const size_t &label_size, std::function<void(void *)> func); std::function<void(void *)> func);
// call for Front/Pop thread // call for Front/Pop thread
EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const size_t &feature_len, EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const std::vector<size_t> &shape);
const size_t &label_size);
EXPORT BlockQueueStatus_T Push(unsigned int handle, void *feature_addr, size_t feature_size, void *label_addr, EXPORT BlockQueueStatus_T Push(unsigned int handle, const std::vector<DataItemGpu> &data,
size_t label_size, unsigned int timeout_in_sec); unsigned int timeout_in_sec);
EXPORT BlockQueueStatus_T Front(unsigned int handle, void **feature_addr, size_t *feature_size, void **label_addr, EXPORT BlockQueueStatus_T Front(unsigned int handle, void **addr, size_t *len);
size_t *label_size);
EXPORT BlockQueueStatus_T Pop(unsigned int handle); EXPORT BlockQueueStatus_T Pop(unsigned int handle);
EXPORT void set_device_id(int device_id); EXPORT void set_device_id(int device_id);

View File

@ -15,6 +15,7 @@
*/ */
#include "kernel/gpu/data/dataset_init_kernel.h" #include "kernel/gpu/data/dataset_init_kernel.h"
#include "kernel/gpu/data/dataset_utils.h"
#include "device/gpu/gpu_buffer_mgr.h" #include "device/gpu/gpu_buffer_mgr.h"
#include "device/gpu/gpu_memory_allocator.h" #include "device/gpu/gpu_memory_allocator.h"
#include "utils/convert_utils.h" #include "utils/convert_utils.h"
@ -23,7 +24,7 @@ namespace mindspore {
namespace kernel { namespace kernel {
using mindspore::device::GpuBufferMgr; using mindspore::device::GpuBufferMgr;
DatasetInitKernel::DatasetInitKernel() : feature_size_(0), label_size_(0) {} DatasetInitKernel::DatasetInitKernel() : total_bytes_(0) {}
const std::vector<size_t> &DatasetInitKernel::GetInputSizeList() const { return input_size_list_; } const std::vector<size_t> &DatasetInitKernel::GetInputSizeList() const { return input_size_list_; }
@ -31,39 +32,21 @@ const std::vector<size_t> &DatasetInitKernel::GetOutputSizeList() const { return
const std::vector<size_t> &DatasetInitKernel::GetWorkspaceSizeList() const { return workspace_size_list_; } const std::vector<size_t> &DatasetInitKernel::GetWorkspaceSizeList() const { return workspace_size_list_; }
size_t DatasetInitKernel::TensorSize(std::vector<int> &shape) const {
if (shape.size() == 0) {
return 0;
}
int size = 1;
for (size_t i = 0; i < shape.size(); i++) {
size *= shape[i];
}
return IntToSize(size);
}
bool DatasetInitKernel::Init(const CNodePtr &kernel_node) { bool DatasetInitKernel::Init(const CNodePtr &kernel_node) {
queue_name_ = GetAttr<std::string>(kernel_node, "queue_name"); queue_name_ = GetAttr<std::string>(kernel_node, "queue_name");
auto shapes = GetAttr<const std::vector<std::vector<int>>>(kernel_node, "shapes"); auto shapes = GetAttr<const std::vector<std::vector<int>>>(kernel_node, "shapes");
auto data_num = shapes.size();
if (data_num != 2) {
MS_LOG(EXCEPTION) << "Invalid Shapes " << data_num;
}
auto &feature_Shapes = shapes[0];
auto size = TensorSize(feature_Shapes);
feature_size_ = size * sizeof(float);
auto types = GetAttr<const std::vector<TypePtr>>(kernel_node, "types"); auto types = GetAttr<const std::vector<TypePtr>>(kernel_node, "types");
if ((types[1]->type_id() != kNumberTypeInt32) && (types[1]->type_id() != kNumberTypeInt64)) { if (shapes.size() != types.size()) {
MS_LOG(EXCEPTION) << "Invalid types " << types[1]->type_id(); MS_LOG(EXCEPTION) << "Invalid shapes: " << shapes << ", types: " << types;
} }
size_t label_unit = (types[1]->type_id() == kNumberTypeInt32) ? sizeof(int32_t) : sizeof(int64_t); for (size_t i = 0; i < shapes.size(); i++) {
size = TensorSize(shapes[1]); int unit = UnitSizeInBytes(types[i]->type_id());
label_size_ = size * label_unit; int nums = ElementNums(shapes[i]);
int bytes = unit * nums;
shapes_.push_back(bytes);
total_bytes_ += bytes;
}
return true; return true;
} }
@ -72,17 +55,15 @@ void DatasetInitKernel::InitSizeLists() { return; }
bool DatasetInitKernel::Launch(const std::vector<AddressPtr> &, const std::vector<AddressPtr> &, bool DatasetInitKernel::Launch(const std::vector<AddressPtr> &, const std::vector<AddressPtr> &,
const std::vector<AddressPtr> &, uintptr_t) { const std::vector<AddressPtr> &, uintptr_t) {
void *addr = nullptr; void *addr = nullptr;
size_t len = (feature_size_ + label_size_) * buffer_q_capacity_; size_t len = total_bytes_ * buffer_q_capacity_;
if (!device::gpu::GPUMemoryAllocator::GetInstance().AllocBufferQueueMem(len, &addr)) { if (!device::gpu::GPUMemoryAllocator::GetInstance().AllocBufferQueueMem(len, &addr)) {
MS_LOG(EXCEPTION) << "Memory not enough: failed to allocate GPU buffer queue memory[" << len << "]."; MS_LOG(EXCEPTION) << "Memory not enough: failed to allocate GPU buffer queue memory[" << len << "].";
} }
auto status = auto status = GpuBufferMgr::GetInstance().Create(0, queue_name_, addr, shapes_, buffer_q_capacity_);
GpuBufferMgr::GetInstance().Create(0, queue_name_, addr, feature_size_, label_size_, buffer_q_capacity_);
if (status) { if (status) {
MS_LOG(EXCEPTION) << "Init Dataset Failed: " << queue_name_ << ", " << feature_size_ << ", " << label_size_ << ", " MS_LOG(EXCEPTION) << "Init Dataset Failed. len: " << len << ", status:" << status;
<< status;
} }
return true; return true;

View File

@ -40,11 +40,9 @@ class DatasetInitKernel : public GpuKernel {
void InitSizeLists() override; void InitSizeLists() override;
private: private:
size_t TensorSize(std::vector<int> &) const;
std::string queue_name_; std::string queue_name_;
size_t feature_size_; std::vector<size_t> shapes_;
size_t label_size_; size_t total_bytes_;
std::vector<size_t> input_size_list_; std::vector<size_t> input_size_list_;
std::vector<size_t> output_size_list_; std::vector<size_t> output_size_list_;

View File

@ -15,21 +15,19 @@
*/ */
#include "kernel/gpu/data/dataset_iterator_kernel.h" #include "kernel/gpu/data/dataset_iterator_kernel.h"
#include <cuda_runtime_api.h> #include <cuda_runtime_api.h>
#include <string> #include <string>
#include <vector> #include <vector>
#include "device/gpu/gpu_buffer_mgr.h" #include "device/gpu/gpu_buffer_mgr.h"
#include "device/gpu/gpu_common.h" #include "device/gpu/gpu_common.h"
#include "kernel/gpu/data/dataset_utils.h"
namespace mindspore { namespace mindspore {
namespace kernel { namespace kernel {
using mindspore::device::GpuBufferMgr; using mindspore::device::GpuBufferMgr;
using mindspore::device::HandleMgr; using mindspore::device::HandleMgr;
DatasetIteratorKernel::DatasetIteratorKernel() DatasetIteratorKernel::DatasetIteratorKernel() : handle_(HandleMgr::INVALID_HANDLE), total_bytes_(0) {}
: output_num_(0), handle_(HandleMgr::INVALID_HANDLE), feature_size_(0), label_size_(0) {}
DatasetIteratorKernel::~DatasetIteratorKernel() { GpuBufferMgr::GetInstance().Close(handle_); } DatasetIteratorKernel::~DatasetIteratorKernel() { GpuBufferMgr::GetInstance().Close(handle_); }
@ -39,65 +37,40 @@ const std::vector<size_t> &DatasetIteratorKernel::GetOutputSizeList() const { re
const std::vector<size_t> &DatasetIteratorKernel::GetWorkspaceSizeList() const { return workspace_size_list_; } const std::vector<size_t> &DatasetIteratorKernel::GetWorkspaceSizeList() const { return workspace_size_list_; }
size_t DatasetIteratorKernel::TensorSize(std::vector<int> &shape) const {
if (shape.size() == 0) {
return 0;
}
int size = 1;
for (size_t i = 0; i < shape.size(); i++) {
size *= shape[i];
}
return IntToSize(size);
}
bool DatasetIteratorKernel::Init(const CNodePtr &kernel_node) { bool DatasetIteratorKernel::Init(const CNodePtr &kernel_node) {
output_num_ = GetAttr<int>(kernel_node, "output_num");
queue_name_ = GetAttr<std::string>(kernel_node, "shared_name"); queue_name_ = GetAttr<std::string>(kernel_node, "shared_name");
auto shapes = GetAttr<const std::vector<std::vector<int>>>(kernel_node, "shapes"); auto shapes = GetAttr<const std::vector<std::vector<int>>>(kernel_node, "shapes");
auto data_num = shapes.size();
if (data_num != 2) {
MS_LOG(EXCEPTION) << "Invalid Shapes " << data_num;
}
auto &feature_Shapes = shapes[0];
auto size = TensorSize(feature_Shapes);
feature_size_ = size * sizeof(float);
auto types = GetAttr<const std::vector<TypePtr>>(kernel_node, "types"); auto types = GetAttr<const std::vector<TypePtr>>(kernel_node, "types");
if ((types[1]->type_id() != kNumberTypeInt32) && (types[1]->type_id() != kNumberTypeInt64)) { if (shapes.size() != types.size()) {
MS_LOG(EXCEPTION) << "Invalid types " << types[1]->type_id(); MS_LOG(EXCEPTION) << "Invalid shapes: " << shapes << ", types: " << types;
} }
size_t label_unit = (types[1]->type_id() == kNumberTypeInt32) ? sizeof(int32_t) : sizeof(int64_t); for (size_t i = 0; i < shapes.size(); i++) {
size = TensorSize(shapes[1]); int unit = UnitSizeInBytes(types[i]->type_id());
label_size_ = size * label_unit; int nums = ElementNums(shapes[i]);
int bytes = unit * nums;
output_size_list_.push_back(bytes);
total_bytes_ += bytes;
}
InitSizeLists(); handle_ = GpuBufferMgr::GetInstance().Open(0, queue_name_, output_size_list_);
handle_ = GpuBufferMgr::GetInstance().Open(0, queue_name_, feature_size_, label_size_);
if (handle_ == HandleMgr::INVALID_HANDLE) { if (handle_ == HandleMgr::INVALID_HANDLE) {
MS_LOG(EXCEPTION) << "Gpu Queue(" << queue_name_ << ") Open Failed: feature_size(" << feature_size_ MS_LOG(EXCEPTION) << "Gpu Queue(" << queue_name_ << ") Open Failed";
<< "), label_size(" << label_size_ << ")";
} }
return true; return true;
} }
void DatasetIteratorKernel::InitSizeLists() { void DatasetIteratorKernel::InitSizeLists() { return; }
output_size_list_.push_back(feature_size_);
output_size_list_.push_back(label_size_);
}
bool DatasetIteratorKernel::Launch(const std::vector<AddressPtr> &, const std::vector<AddressPtr> &, bool DatasetIteratorKernel::Launch(const std::vector<AddressPtr> &, const std::vector<AddressPtr> &,
const std::vector<AddressPtr> &outputs, uintptr_t) { const std::vector<AddressPtr> &outputs, uintptr_t) {
void *feature_addr{nullptr}, *label_addr{nullptr}; void *addr = nullptr;
size_t feature_size{0}, label_size{0}; size_t len = 0;
int repeat = 0; int repeat = 0;
while (true) { while (true) {
auto ret = GpuBufferMgr::GetInstance().Front(handle_, &feature_addr, &feature_size, &label_addr, &label_size); auto ret = GpuBufferMgr::GetInstance().Front(handle_, &addr, &len);
if (ret == device::SUCCESS) { if (ret == device::SUCCESS) {
break; break;
} }
@ -117,19 +90,18 @@ bool DatasetIteratorKernel::Launch(const std::vector<AddressPtr> &, const std::v
return false; return false;
} }
if (feature_size != feature_size_ || label_size != label_size_) { if (total_bytes_ != len) {
MS_LOG(ERROR) << "DatasetIteratorKernel: Front Error: " << feature_addr << ", " << feature_size << ", " MS_LOG(ERROR) << "Dataset front error. read: " << len << ", expect: " << total_bytes_ << ", ";
<< label_addr << ", " << label_size;
return false; return false;
} }
CHECK_CUDA_RET_WITH_EXCEPT(cudaMemcpy(outputs[0]->addr, feature_addr, feature_size, cudaMemcpyDeviceToDevice), for (size_t i = 0; i < output_size_list_.size(); i++) {
"Cuda Memcpy Failed"); CHECK_CUDA_RET_WITH_EXCEPT(cudaMemcpy(outputs[i]->addr, addr, output_size_list_[i], cudaMemcpyDeviceToDevice),
CHECK_CUDA_RET_WITH_EXCEPT(cudaMemcpy(outputs[1]->addr, label_addr, label_size, cudaMemcpyDeviceToDevice),
"Cuda Memcpy Failed"); "Cuda Memcpy Failed");
addr = reinterpret_cast<unsigned char *>(addr) + output_size_list_[i];
}
(void)GpuBufferMgr::GetInstance().Pop(handle_); (void)GpuBufferMgr::GetInstance().Pop(handle_);
return true; return true;
} }
} // namespace kernel } // namespace kernel

View File

@ -40,14 +40,9 @@ class DatasetIteratorKernel : public GpuKernel {
void InitSizeLists() override; void InitSizeLists() override;
private: private:
size_t TensorSize(std::vector<int> &) const;
std::string queue_name_; std::string queue_name_;
int output_num_;
unsigned int handle_; unsigned int handle_;
size_t total_bytes_;
size_t feature_size_;
size_t label_size_;
std::vector<size_t> input_size_list_; std::vector<size_t> input_size_list_;
std::vector<size_t> output_size_list_; std::vector<size_t> output_size_list_;

View File

@ -0,0 +1,70 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "kernel/gpu/data/dataset_utils.h"
namespace mindspore {
namespace kernel {
size_t UnitSizeInBytes(const mindspore::TypeId &t) {
size_t bytes = 0;
switch (t) {
case kNumberTypeBool:
case kNumberTypeInt8:
case kNumberTypeUInt8:
bytes = 1;
break;
case kNumberTypeInt16:
case kNumberTypeUInt16:
case kNumberTypeFloat16:
bytes = 2;
break;
case kNumberTypeInt:
case kNumberTypeUInt:
case kNumberTypeInt32:
case kNumberTypeUInt32:
case kNumberTypeFloat:
case kNumberTypeFloat32:
bytes = 4;
break;
case kNumberTypeUInt64:
case kNumberTypeInt64:
case kNumberTypeFloat64:
bytes = 8;
break;
default:
MS_LOG(EXCEPTION) << "Invalid types " << t;
break;
}
return bytes;
}
int ElementNums(const std::vector<int> &shape) {
if (shape.size() == 0) {
return 0;
}
int nums = 1;
for (size_t i = 0; i < shape.size(); i++) {
nums *= shape[i];
}
return nums;
}
} // namespace kernel
} // namespace mindspore

View File

@ -0,0 +1,29 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_KERNEL_GPU_DATASET_UTILS_KERNEL_H_
#define MINDSPORE_CCSRC_KERNEL_GPU_DATASET_UTILS_KERNEL_H_
#include <vector>
#include "ir/dtype/type.h"
namespace mindspore {
namespace kernel {
size_t UnitSizeInBytes(const mindspore::TypeId &t);
int ElementNums(const std::vector<int> &shape);
} // namespace kernel
} // namespace mindspore
#endif // MINDSPORE_CCSRC_KERNEL_GPU_DATASET_UTILS_KERNEL_H_