Add Size() and Capacity() in gpu queue.

This commit is contained in:
anthonyaje 2020-08-19 16:48:31 -04:00
parent cc23f1d819
commit 09a99cf80b
5 changed files with 61 additions and 4 deletions
mindspore/ccsrc

View File

@ -49,7 +49,7 @@ class DatasetInitKernel : public GpuKernel {
std::vector<size_t> workspace_size_list_; std::vector<size_t> workspace_size_list_;
// The capacity of buffer Q. // The capacity of buffer Q.
size_t buffer_q_capacity_{2}; size_t buffer_q_capacity_{1};
}; };
MS_REG_GPU_KERNEL(InitDataSetQueue, DatasetInitKernel) MS_REG_GPU_KERNEL(InitDataSetQueue, DatasetInitKernel)

View File

@ -22,7 +22,15 @@
namespace mindspore { namespace mindspore {
namespace device { namespace device {
GpuQueue::GpuQueue(void *addr, const std::vector<size_t> &shape, const size_t &capacity) GpuQueue::GpuQueue(void *addr, const std::vector<size_t> &shape, const size_t &capacity)
: buffer_(addr), head_(0), tail_(0), shape_(shape), len_(0), capacity_(capacity), stream_(0), node_info_(nullptr) { : buffer_(addr),
head_(0),
tail_(0),
shape_(shape),
len_(0),
size_(0),
capacity_(capacity),
stream_(0),
node_info_(nullptr) {
CHECK_CUDA_RET_WITH_ERROR(cudaStreamCreate(&stream_), "Cuda Create Stream Failed"); CHECK_CUDA_RET_WITH_ERROR(cudaStreamCreate(&stream_), "Cuda Create Stream Failed");
node_info_ = std::make_unique<NodeInfo[]>(capacity); node_info_ = std::make_unique<NodeInfo[]>(capacity);
for (auto item : shape) { for (auto item : shape) {
@ -52,6 +60,7 @@ BlockQueueStatus_T GpuQueue::Push(const std::vector<DataItemGpu> &data) {
CHECK_CUDA_RET_WITH_ERROR(cudaEventCreate(&(*(node_info_[tail_].event_))), "Cuda Create Event Failed"); CHECK_CUDA_RET_WITH_ERROR(cudaEventCreate(&(*(node_info_[tail_].event_))), "Cuda Create Event Failed");
node_info_[tail_].data_ = data; node_info_[tail_].data_ = data;
tail_ = (tail_ + 1) % (capacity_); tail_ = (tail_ + 1) % (capacity_);
++size_;
return SUCCESS; return SUCCESS;
} }
@ -69,6 +78,7 @@ BlockQueueStatus_T GpuQueue::Front(void **addr, size_t *len) const {
BlockQueueStatus_T GpuQueue::Pop() { BlockQueueStatus_T GpuQueue::Pop() {
head_ = (head_ + 1) % (capacity_); head_ = (head_ + 1) % (capacity_);
--size_;
return SUCCESS; return SUCCESS;
} }

View File

@ -44,13 +44,15 @@ class GpuQueue {
void RegisterRelease(const std::function<void(void *)> &func) { host_release_ = func; } void RegisterRelease(const std::function<void(void *)> &func) { host_release_ = func; }
inline bool IsEmpty() const { return head_ == tail_; } inline bool IsEmpty() const { return size_ == 0; }
inline bool IsFull() const { return head_ == ((tail_ + 1) % (capacity_)); } inline bool IsFull() const { return size_ == capacity_; }
BlockQueueStatus_T Push(const std::vector<DataItemGpu> &data); BlockQueueStatus_T Push(const std::vector<DataItemGpu> &data);
BlockQueueStatus_T Front(void **ptr, size_t *len) const; BlockQueueStatus_T Front(void **ptr, size_t *len) const;
BlockQueueStatus_T Pop(); BlockQueueStatus_T Pop();
bool Destroy(); bool Destroy();
size_t Size() { return size_; }
size_t Capacity() { return capacity_; }
private: private:
struct NodeInfo { struct NodeInfo {
@ -63,6 +65,7 @@ class GpuQueue {
size_t tail_; size_t tail_;
std::vector<size_t> shape_; std::vector<size_t> shape_;
size_t len_; size_t len_;
size_t size_;
size_t capacity_; size_t capacity_;
cudaStream_t stream_; cudaStream_t stream_;
std::unique_ptr<NodeInfo[]> node_info_; std::unique_ptr<NodeInfo[]> node_info_;
@ -83,6 +86,8 @@ class BlockingQueue {
BlockQueueStatus_T Front(void **ptr, size_t *len); BlockQueueStatus_T Front(void **ptr, size_t *len);
BlockQueueStatus_T Pop(); BlockQueueStatus_T Pop();
bool Destroy(); bool Destroy();
size_t Size() { return queue_->Size(); }
size_t Capacity() { return queue_->Capacity(); }
private: private:
std::mutex mutex_; std::mutex mutex_;

View File

@ -187,5 +187,39 @@ bool GpuBufferMgr::CloseNotify() {
} }
void GpuBufferMgr::CloseConfirm() { sema.Signal(); } void GpuBufferMgr::CloseConfirm() { sema.Signal(); }
size_t GpuBufferMgr::Size(unsigned int handle) {
if (handle == HandleMgr::INVALID_HANDLE) {
MS_LOG(ERROR) << "handle is invalid";
return 0;
}
return handle_queue_map_.at(handle)->Size();
}
size_t GpuBufferMgr::Size(unsigned int device_id, const std::string &channel_name) {
std::string name = std::to_string(device_id) + std::string("_") + channel_name;
if (!name_queue_map_.count(name)) {
MS_LOG(ERROR) << "Queue not exist " << name;
return 0;
}
return name_queue_map_.at(name)->Size();
}
size_t GpuBufferMgr::Capacity(unsigned int handle) {
if (handle == HandleMgr::INVALID_HANDLE) {
MS_LOG(ERROR) << "handle is invalid";
return 0;
}
return handle_queue_map_.at(handle)->Capacity();
}
size_t GpuBufferMgr::Capacity(unsigned int device_id, const std::string &channel_name) {
std::string name = std::to_string(device_id) + std::string("_") + channel_name;
if (!name_queue_map_.count(name)) {
MS_LOG(ERROR) << "Queue not exist " << name;
return 0;
}
return name_queue_map_.at(name)->Capacity();
}
} // namespace device } // namespace device
} // namespace mindspore } // namespace mindspore

View File

@ -111,6 +111,14 @@ class GpuBufferMgr {
// call for dataset send thread // call for dataset send thread
EXPORT void CloseConfirm(); EXPORT void CloseConfirm();
EXPORT size_t Size(unsigned int handle);
EXPORT size_t Size(unsigned int device_id, const std::string &channel_name);
EXPORT size_t Capacity(unsigned int handle);
EXPORT size_t Capacity(unsigned int device_id, const std::string &channel_name);
private: private:
void set_device() const; void set_device() const;