forked from mindspore-Ecosystem/mindspore
!5286 [MD] minddata gpu add circular_memory to device_queue
Merge pull request !5286 from xiefangqi/xfq_add_circular_gpu_r0.7
This commit is contained in:
commit
e3b0ae75ae
|
@ -44,9 +44,9 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i
|
|||
DeviceQueueOp::~DeviceQueueOp() {}
|
||||
|
||||
#ifdef ENABLE_GPUQUE
|
||||
void ReleaseData(void *addr) {
|
||||
void DeviceQueueOp::ReleaseData(void *addr) {
|
||||
if (addr != nullptr) {
|
||||
free(addr);
|
||||
pool_->Deallocate(addr);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
@ -87,6 +87,7 @@ Status DeviceQueueOp::operator()() {
|
|||
#endif
|
||||
} else if (device_type_ == DeviceType::GPU) {
|
||||
#ifdef ENABLE_GPUQUE
|
||||
RETURN_IF_NOT_OK(CircularPool::CreateCircularPool(&pool_));
|
||||
RETURN_IF_NOT_OK(SendDataToGPU());
|
||||
#endif
|
||||
} else if (device_type_ == DeviceType::CPU) {
|
||||
|
@ -187,6 +188,7 @@ Status DeviceQueueOp::SendDataToGPU() {
|
|||
bool is_break_loop = false;
|
||||
bool is_open = false;
|
||||
uint32_t handle = INVALID_HANDLE;
|
||||
auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1);
|
||||
|
||||
std::unique_ptr<DataBuffer> current_buffer;
|
||||
RETURN_IF_NOT_OK(GetNextInput(¤t_buffer));
|
||||
|
@ -204,7 +206,7 @@ Status DeviceQueueOp::SendDataToGPU() {
|
|||
data_size.push_back(static_cast<size_t>(curr_row[i]->SizeInBytes()));
|
||||
}
|
||||
if (!is_open) {
|
||||
handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, ReleaseData);
|
||||
handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, release_function);
|
||||
if (handle == INVALID_HANDLE) {
|
||||
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "open failed");
|
||||
}
|
||||
|
@ -246,7 +248,7 @@ Status DeviceQueueOp::RetryPushGPUData(const std::vector<size_t> &data_size, con
|
|||
BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME);
|
||||
if (ret) {
|
||||
for (int i = 0; i < items.size(); i++) {
|
||||
free(items[i].data_ptr_);
|
||||
ReleaseData(items[i].data_ptr_);
|
||||
}
|
||||
if (ret == BlockQueueStatus_T::ERROR_INPUT) {
|
||||
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "invalid input Data, please check it.");
|
||||
|
@ -267,7 +269,7 @@ Status DeviceQueueOp::RetryPushGPUData(const std::vector<size_t> &data_size, con
|
|||
Status DeviceQueueOp::MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row) {
|
||||
int i = 0;
|
||||
for (auto &sub_item : *items) {
|
||||
sub_item.data_ptr_ = (unsigned char *)malloc(sub_item.data_len_);
|
||||
RETURN_IF_NOT_OK(pool_->Allocate(sub_item.data_len_, &sub_item.data_ptr_));
|
||||
if (sub_item.data_ptr_ == nullptr) {
|
||||
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "memory malloc failed.");
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
#endif
|
||||
|
||||
#ifdef ENABLE_GPUQUE
|
||||
#include "minddata/dataset/util/circular_pool.h"
|
||||
#include "runtime/device/gpu/gpu_buffer_mgr.h"
|
||||
using mindspore::device::BlockQueueStatus_T;
|
||||
using mindspore::device::GpuBufferMgr;
|
||||
|
@ -162,6 +163,9 @@ class DeviceQueueOp : public PipelineOp {
|
|||
Status SendDataToGPU();
|
||||
Status RetryPushGPUData(const std::vector<size_t> &data_size, const TensorRow &curr_row, uint32_t handle);
|
||||
Status MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row);
|
||||
void ReleaseData(void *addr);
|
||||
|
||||
std::shared_ptr<MemoryPool> pool_;
|
||||
#endif
|
||||
|
||||
Status SendDataToCPU();
|
||||
|
|
Loading…
Reference in New Issue