fix gpu buffer reentry issue

This commit is contained in:
wilfChen 2022-03-26 10:16:06 +08:00
parent 8589335879
commit cf829d8415
8 changed files with 99 additions and 144 deletions

View File

@ -442,10 +442,9 @@ Status DeviceQueueOp::PushDataToGPU() {
auto items = std::move(item.data_item);
bool eoe_flag = item.eoe_flag;
int64_t send_batch = 0;
uint32_t handle = INVALID_HANDLE;
auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2);
handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, {}, release_function);
if (handle == INVALID_HANDLE) {
auto ret = GpuBufferMgr::GetInstance().Open(channel_name_, {}, release_function);
if (ret != BlockQueueStatus_T::SUCCESS) {
return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
"[Internal ERROR] Failed to open channel for sending data.");
}
@ -463,7 +462,7 @@ Status DeviceQueueOp::PushDataToGPU() {
return Status(StatusCode::kMDTimeOut, __LINE__, __FILE__,
"[Internal ERROR] Failed to prefetch data in current PS mode(cache data when sending).");
}
RETURN_IF_NOT_OK(RetryPushData(handle, items, is_profiling_enable, &push_cost));
RETURN_IF_NOT_OK(RetryPushData(items, is_profiling_enable, &push_cost));
#ifndef ENABLE_SECURITY
ProfilingRecorder(is_profiling_enable, profiling_node, send_batch, push_cost, &batch_start_time, &end_time,
gpu_connector_->capacity(), gpu_connector_->size());
@ -491,7 +490,7 @@ Status DeviceQueueOp::PushDataToGPU() {
eoe_flag = item.eoe_flag;
// If the batches send by dataset are more than gpu calculate, gpu will core for no signal notify.
if (rc.IsError()) {
GpuBufferMgr::GetInstance().Close(handle);
GpuBufferMgr::GetInstance().Close(channel_name_);
GpuBufferMgr::GetInstance().CloseConfirm();
return rc;
}
@ -507,13 +506,12 @@ Status DeviceQueueOp::PushDataToGPU() {
tree_->SetFinished();
MS_LOG(INFO) << "ExecutionTree finished. Device queue pushed number of batches: " << send_batch;
GpuBufferMgr::GetInstance().Close(handle);
GpuBufferMgr::GetInstance().Close(channel_name_);
GpuBufferMgr::GetInstance().CloseConfirm();
return Status::OK();
}
Status DeviceQueueOp::RetryPushData(unsigned int handle, const std::vector<DataItemGpu> &items, const bool profiling,
uint64_t *push_time) {
Status DeviceQueueOp::RetryPushData(const std::vector<DataItemGpu> &items, const bool profiling, uint64_t *push_time) {
bool flag_log = false;
#ifndef ENABLE_SECURITY
uint64_t start_time = 0;
@ -522,7 +520,7 @@ Status DeviceQueueOp::RetryPushData(unsigned int handle, const std::vector<DataI
}
#endif
while (!GpuBufferMgr::GetInstance().IsClosed() && !TaskManager::FindMe()->Interrupted()) {
BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME);
BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(channel_name_, items, WAIT_TIME);
if (ret) {
if (ret == BlockQueueStatus_T::ERROR_INPUT) {
return Status(
@ -673,16 +671,16 @@ Status DeviceQueueOp::MallocForGPUData(std::vector<device::DataItemGpu> *items,
Status DeviceQueueOp::ClearDevice() {
MS_LOG(INFO) << "Clearing the data in GPU device: " << device_id_ << " channel: " << channel_name_;
auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2);
auto handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, {}, release_function);
if (handle == INVALID_HANDLE) {
auto ret = GpuBufferMgr::GetInstance().Open(channel_name_, {}, release_function);
if (ret != BlockQueueStatus_T::SUCCESS) {
return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
"[Internal ERROR] Failed to open channel for clearing the device.");
}
BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Clear(handle);
ret = GpuBufferMgr::GetInstance().Clear(channel_name_);
CHECK_FAIL_RETURN_UNEXPECTED(!ret, "Failed to clear the device.");
GpuBufferMgr::GetInstance().Close(handle);
GpuBufferMgr::GetInstance().Close(channel_name_);
GpuBufferMgr::GetInstance().CloseConfirm();
return Status::OK();
}

View File

@ -145,7 +145,7 @@ class DeviceQueueOp : public PipelineOp {
#ifdef ENABLE_GPUQUE
Status SendDataToGPU();
Status MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row, const int32_t &worker_id);
Status RetryPushData(unsigned int handle, const std::vector<DataItemGpu> &data, bool profiling, uint64_t *push_time);
Status RetryPushData(const std::vector<DataItemGpu> &data, bool profiling, uint64_t *push_time);
void ReleaseData(void *addr, int32_t worker_id);
Status LaunchParallelCopyThread();
Status PushDataToGPU();

View File

@ -30,7 +30,7 @@
namespace mindspore {
namespace device {
enum BlockQueueStatus_T : int { SUCCESS = 0, QUEUE_EXIST, HANDLE_NOT_EXIST, ERROR_INPUT, INTERNAL_ERROR, TIMEOUT };
enum BlockQueueStatus_T : int { SUCCESS = 0, QUEUE_EXIST, QUEUE_NOT_EXIST, ERROR_INPUT, INTERNAL_ERROR, TIMEOUT };
struct DataItemGpu {
int32_t worker_id_{0};

View File

@ -26,67 +26,51 @@ namespace py = pybind11;
namespace mindspore {
namespace device {
static unsigned int AllocHandle() {
static std::atomic<unsigned int> handle(1);
return handle.fetch_add(1, std::memory_order_relaxed);
}
GpuBufferMgr &GpuBufferMgr::GetInstance() noexcept {
static GpuBufferMgr instance;
return instance;
}
BlockQueueStatus_T GpuBufferMgr::Create(unsigned int device_id, const std::string &channel_name, void *addr,
const std::vector<size_t> &shape, const size_t &capacity) {
std::string name = std::to_string(device_id) + std::string("_") + channel_name;
if (name_queue_map_.count(name)) {
MS_LOG(ERROR) << "Queue already exist: " << name;
BlockQueueStatus_T GpuBufferMgr::Create(const std::string &channel_name, void *addr, const std::vector<size_t> &shape,
const size_t &capacity) {
MS_LOG(INFO) << "Gpu queue: " << channel_name << " created.";
if (name_queue_map_.count(channel_name)) {
MS_LOG(ERROR) << "Queue already exist: " << channel_name;
return QUEUE_EXIST;
}
std::shared_ptr<BlockingQueue> queue = std::make_shared<BlockingQueue>();
BlockQueueStatus_T rt = queue->Create(addr, shape, capacity);
if (rt != SUCCESS) {
MS_LOG(ERROR) << "Queue: " << channel_name << "create failed: " << rt;
return rt;
}
(void)name_queue_map_.insert(std::make_pair(name, queue));
(void)name_queue_map_.insert(std::make_pair(channel_name, queue));
init_ = true;
return SUCCESS;
}
unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name,
const std::vector<size_t> &shape, const std::function<void(void *, int32_t)> func) {
BlockQueueStatus_T GpuBufferMgr::Open(const std::string &channel_name, const std::vector<size_t> &shape,
const std::function<void(void *, int32_t)> func) {
MS_LOG(INFO) << "Gpu queue: " << channel_name << " open.";
set_device();
std::string name = std::to_string(device_id) + std::string("_") + channel_name;
if (!name_queue_map_.count(name)) {
MS_LOG(ERROR) << "Queue not exist " << name;
return INVALID_HANDLE;
if (!name_queue_map_.count(channel_name)) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
unsigned int handle = AllocHandle();
if (handle == INVALID_HANDLE) {
MS_LOG(ERROR) << "handle is invalid";
return INVALID_HANDLE;
}
(void)handle_queue_map_.insert(std::make_pair(handle, name_queue_map_[name]));
name_queue_map_[name]->RegisterRelease(func);
name_queue_map_[channel_name]->RegisterRelease(func);
open_by_dataset_++;
return handle;
return SUCCESS;
}
unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name,
const std::vector<size_t> &shape) {
BlockQueueStatus_T GpuBufferMgr::Open(const std::string &channel_name, const std::vector<size_t> &shape) {
MS_LOG(INFO) << "Gpu queue: " << channel_name << " open.";
set_device();
std::string name = std::to_string(device_id) + std::string("_") + channel_name;
if (!name_queue_map_.count(name)) {
MS_LOG(ERROR) << "Queue not exist " << name;
return INVALID_HANDLE;
if (!name_queue_map_.count(channel_name)) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
unsigned int handle = AllocHandle();
if (handle == INVALID_HANDLE) {
MS_LOG(ERROR) << "handle is invalid";
return INVALID_HANDLE;
}
(void)handle_queue_map_.insert(std::make_pair(handle, name_queue_map_[name]));
return handle;
return SUCCESS;
}
void GpuBufferMgr::set_device_id(int device_id) { cur_dev_id_ = device_id; }
@ -105,44 +89,48 @@ void GpuBufferMgr::set_device() const {
}
}
BlockQueueStatus_T GpuBufferMgr::Push(unsigned int handle, const std::vector<DataItemGpu> &data,
BlockQueueStatus_T GpuBufferMgr::Push(const std::string &channel_name, const std::vector<DataItemGpu> &data,
unsigned int timeout_in_sec) {
auto iter = handle_queue_map_.find(handle);
if (iter == handle_queue_map_.end()) {
return HANDLE_NOT_EXIST;
auto iter = name_queue_map_.find(channel_name);
if (iter == name_queue_map_.end()) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
return iter->second->Push(data, timeout_in_sec);
}
BlockQueueStatus_T GpuBufferMgr::Front(unsigned int handle, std::vector<DataItemGpu> *data) {
auto iter = handle_queue_map_.find(handle);
if (iter == handle_queue_map_.end()) {
return HANDLE_NOT_EXIST;
BlockQueueStatus_T GpuBufferMgr::Front(const std::string &channel_name, std::vector<DataItemGpu> *data) {
auto iter = name_queue_map_.find(channel_name);
if (iter == name_queue_map_.end()) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
return iter->second->Front(data);
}
BlockQueueStatus_T GpuBufferMgr::Pop(unsigned int handle) {
auto iter = handle_queue_map_.find(handle);
if (iter == handle_queue_map_.end()) {
return HANDLE_NOT_EXIST;
BlockQueueStatus_T GpuBufferMgr::Pop(const std::string &channel_name) {
auto iter = name_queue_map_.find(channel_name);
if (iter == name_queue_map_.end()) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
return iter->second->Pop();
}
BlockQueueStatus_T GpuBufferMgr::Clear(unsigned int handle) {
auto iter = handle_queue_map_.find(handle);
if (iter == handle_queue_map_.end()) {
return HANDLE_NOT_EXIST;
BlockQueueStatus_T GpuBufferMgr::Clear(const std::string &channel_name) {
auto iter = name_queue_map_.find(channel_name);
if (iter == name_queue_map_.end()) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return QUEUE_NOT_EXIST;
}
return iter->second->Clear();
}
void GpuBufferMgr::Close(unsigned int handle) noexcept {
if (!handle_queue_map_.count(handle)) {
return;
}
(void)handle_queue_map_.erase(handle);
void GpuBufferMgr::Close(const std::string &channel_name) noexcept {
MS_LOG(INFO) << "Close the queue: " << channel_name;
return;
}
@ -151,6 +139,7 @@ bool GpuBufferMgr::IsInit() const { return init_; }
bool GpuBufferMgr::IsClosed() const { return closed_; }
bool GpuBufferMgr::Destroy() {
MS_LOG(INFO) << "Destroy all GPU queue.";
for (auto iter = name_queue_map_.begin(); iter != name_queue_map_.end(); ++iter) {
std::shared_ptr<BlockingQueue> queue = iter->second;
if (queue != nullptr) {
@ -164,9 +153,8 @@ bool GpuBufferMgr::Destroy() {
return true;
}
inline bool GpuBufferMgr::isCreated(unsigned int device_id, const std::string &channel_name) {
std::string name = std::to_string(device_id) + std::string("_") + channel_name;
if (name_queue_map_.count(name) != 0) {
inline bool GpuBufferMgr::isCreated(const std::string &channel_name) {
if (name_queue_map_.count(channel_name) != 0) {
return true;
}
return false;
@ -195,46 +183,20 @@ bool GpuBufferMgr::CloseNotify() {
void GpuBufferMgr::CloseConfirm() { sema.Signal(); }
size_t GpuBufferMgr::Size(unsigned int handle) {
if (handle == INVALID_HANDLE) {
MS_LOG(ERROR) << "handle is invalid";
size_t GpuBufferMgr::Size(const std::string &channel_name) {
if (!name_queue_map_.count(channel_name)) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return 0;
}
if (handle_queue_map_.count(handle) == 0) {
MS_LOG(ERROR) << "Handle not exist " << handle;
return 0;
}
return handle_queue_map_.at(handle)->Size();
return name_queue_map_.at(channel_name)->Size();
}
size_t GpuBufferMgr::Size(unsigned int device_id, const std::string &channel_name) {
std::string name = std::to_string(device_id) + std::string("_") + channel_name;
if (!name_queue_map_.count(name)) {
MS_LOG(ERROR) << "Queue not exist " << name;
size_t GpuBufferMgr::Capacity(const std::string &channel_name) {
if (!name_queue_map_.count(channel_name)) {
MS_LOG(ERROR) << "Queue not exist " << channel_name;
return 0;
}
return name_queue_map_.at(name)->Size();
}
size_t GpuBufferMgr::Capacity(unsigned int handle) {
if (handle == INVALID_HANDLE) {
MS_LOG(ERROR) << "handle is invalid";
return 0;
}
if (handle_queue_map_.count(handle) == 0) {
MS_LOG(ERROR) << "Handle not exist " << handle;
return 0;
}
return handle_queue_map_.at(handle)->Capacity();
}
size_t GpuBufferMgr::Capacity(unsigned int device_id, const std::string &channel_name) {
std::string name = std::to_string(device_id) + std::string("_") + channel_name;
if (!name_queue_map_.count(name)) {
MS_LOG(ERROR) << "Queue not exist " << name;
return 0;
}
return name_queue_map_.at(name)->Capacity();
return name_queue_map_.at(channel_name)->Capacity();
}
} // namespace device
} // namespace mindspore

View File

@ -62,33 +62,31 @@ class Semaphore {
class GpuBufferMgr {
public:
static const unsigned int INVALID_HANDLE = 0xffffffffUL;
EXPORT GpuBufferMgr() : cur_dev_id_(0), init_(false), closed_(false), open_by_dataset_(0) {}
EXPORT virtual ~GpuBufferMgr() = default;
EXPORT static GpuBufferMgr &GetInstance() noexcept;
EXPORT BlockQueueStatus_T Create(unsigned int device_id, const std::string &channel_name, void *addr,
const std::vector<size_t> &shape, const size_t &capacity);
EXPORT BlockQueueStatus_T Create(const std::string &channel_name, void *addr, const std::vector<size_t> &shape,
const size_t &capacity);
// call for Push thread
EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const std::vector<size_t> &shape,
std::function<void(void *, int32_t)> func);
EXPORT BlockQueueStatus_T Open(const std::string &channel_name, const std::vector<size_t> &shape,
std::function<void(void *, int32_t)> func);
// call for Front/Pop thread
EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const std::vector<size_t> &shape);
EXPORT BlockQueueStatus_T Open(const std::string &channel_name, const std::vector<size_t> &shape);
EXPORT BlockQueueStatus_T Push(unsigned int handle, const std::vector<DataItemGpu> &data,
EXPORT BlockQueueStatus_T Push(const std::string &channel_name, const std::vector<DataItemGpu> &data,
unsigned int timeout_in_sec);
EXPORT BlockQueueStatus_T Front(unsigned int handle, std::vector<DataItemGpu> *data);
EXPORT BlockQueueStatus_T Pop(unsigned int handle);
EXPORT BlockQueueStatus_T Clear(unsigned int handle);
EXPORT BlockQueueStatus_T Front(const std::string &channel_name, std::vector<DataItemGpu> *data);
EXPORT BlockQueueStatus_T Pop(const std::string &channel_name);
EXPORT BlockQueueStatus_T Clear(const std::string &channel_name);
EXPORT void set_device_id(int device_id);
EXPORT void Close(unsigned int handle) noexcept;
EXPORT void Close(const std::string &channel_name) noexcept;
EXPORT bool IsInit() const;
@ -102,13 +100,9 @@ class GpuBufferMgr {
// call for dataset send thread
EXPORT void CloseConfirm();
EXPORT size_t Size(unsigned int handle);
EXPORT size_t Size(const std::string &channel_name);
EXPORT size_t Size(unsigned int device_id, const std::string &channel_name);
EXPORT size_t Capacity(unsigned int handle);
EXPORT size_t Capacity(unsigned int device_id, const std::string &channel_name);
EXPORT size_t Capacity(const std::string &channel_name);
private:
void set_device() const;
@ -122,10 +116,9 @@ class GpuBufferMgr {
int open_by_dataset_;
Semaphore sema;
std::map<unsigned int, std::shared_ptr<BlockingQueue>> handle_queue_map_;
std::map<std::string, std::shared_ptr<BlockingQueue>> name_queue_map_;
inline bool isCreated(unsigned int device_id, const std::string &channel_name);
inline bool isCreated(const std::string &channel_name);
GpuBufferMgr(const GpuBufferMgr &) = delete;
GpuBufferMgr &operator=(const GpuBufferMgr &) = delete;

View File

@ -59,7 +59,7 @@ bool DatasetInitKernelMod::Launch(const std::vector<AddressPtr> &, const std::ve
<< len << "].";
}
auto status = GpuBufferMgr::GetInstance().Create(0, queue_name_, addr, shapes_, buffer_q_capacity_);
auto status = GpuBufferMgr::GetInstance().Create(queue_name_, addr, shapes_, buffer_q_capacity_);
if (status) {
MS_LOG(EXCEPTION) << "For '" << kernel_name_ << "', init Dataset Failed. len: " << len << ", status:" << status;
}

View File

@ -37,9 +37,9 @@ namespace kernel {
using mindspore::device::GpuBufferMgr;
DatasetIteratorKernelMod::DatasetIteratorKernelMod()
: handle_(GpuBufferMgr::INVALID_HANDLE), profiling_enable_(false), profiling_op_(nullptr) {}
: is_opened_(false), profiling_enable_(false), profiling_op_(nullptr) {}
DatasetIteratorKernelMod::~DatasetIteratorKernelMod() { GpuBufferMgr::GetInstance().Close(handle_); }
DatasetIteratorKernelMod::~DatasetIteratorKernelMod() { GpuBufferMgr::GetInstance().Close(queue_name_); }
bool DatasetIteratorKernelMod::Init(const CNodePtr &kernel_node) {
MS_EXCEPTION_IF_NULL(kernel_node);
@ -92,10 +92,10 @@ bool DatasetIteratorKernelMod::ReadDevice(std::vector<DataItemGpu> *data) {
profiling_enable_ = profiler_inst->GetEnableFlag();
if (profiling_enable_) {
start_time_stamp = profiling_op_->GetTimeStamp();
queue_size = GpuBufferMgr::GetInstance().Size(handle_);
queue_size = GpuBufferMgr::GetInstance().Size(queue_name_);
}
#endif
auto ret = GpuBufferMgr::GetInstance().Front(handle_, data);
auto ret = GpuBufferMgr::GetInstance().Front(queue_name_, data);
if (ret == device::SUCCESS) {
#ifndef ENABLE_SECURITY
if (profiling_enable_) {
@ -115,7 +115,7 @@ bool DatasetIteratorKernelMod::ReadDevice(std::vector<DataItemGpu> *data) {
#ifdef ENABLE_DUMP_IR
mindspore::RDR::TriggerAll();
#endif
MS_LOG(EXCEPTION) << "For '" << kernel_name_ << "', get data timeout";
MS_LOG(EXCEPTION) << "For '" << kernel_name_ << "', get data timeout. Queue name: " << queue_name_;
}
}
#ifndef ENABLE_SECURITY
@ -124,7 +124,8 @@ bool DatasetIteratorKernelMod::ReadDevice(std::vector<DataItemGpu> *data) {
profiling_op_->RecordData(queue_size, start_time_stamp, end_time_stamp);
}
#endif
MS_LOG(ERROR) << "For '" << kernel_name_ << "', get data failed, errcode " << ret;
MS_LOG(ERROR) << "For '" << kernel_name_ << "', get data failed, errcode " << ret
<< ", queue name: " << queue_name_;
return false;
}
return true;
@ -132,11 +133,12 @@ bool DatasetIteratorKernelMod::ReadDevice(std::vector<DataItemGpu> *data) {
bool DatasetIteratorKernelMod::Launch(const std::vector<AddressPtr> &, const std::vector<AddressPtr> &,
const std::vector<AddressPtr> &outputs, void *stream) {
if (handle_ == GpuBufferMgr::INVALID_HANDLE) {
handle_ = GpuBufferMgr::GetInstance().Open(0, queue_name_, output_size_list_);
if (handle_ == GpuBufferMgr::INVALID_HANDLE) {
MS_LOG(EXCEPTION) << "For '" << kernel_name_ << "', gpu Queue(" << queue_name_ << ") Open Failed";
if (!is_opened_) {
auto ret = GpuBufferMgr::GetInstance().Open(queue_name_, output_size_list_);
if (ret != device::BlockQueueStatus_T::SUCCESS) {
MS_LOG(EXCEPTION) << "For '" << kernel_name_ << "', gpu Queue(" << queue_name_ << ") Open Failed: " << ret;
}
is_opened_ = true;
}
if (!ReadDevice(&output_data_)) {
@ -155,7 +157,7 @@ bool DatasetIteratorKernelMod::Launch(const std::vector<AddressPtr> &, const std
CHECK_CUDA_RET_WITH_EXCEPT(kernel_node_, cudaStreamSynchronize(reinterpret_cast<cudaStream_t>(stream)),
"cudaStreamSynchronize failed");
(void)GpuBufferMgr::GetInstance().Pop(handle_);
(void)GpuBufferMgr::GetInstance().Pop(queue_name_);
return true;
}

View File

@ -44,7 +44,7 @@ class DatasetIteratorKernelMod : public NativeGpuKernelMod {
private:
bool ReadDevice(std::vector<DataItemGpu> *data);
std::string queue_name_;
unsigned int handle_;
bool is_opened_;
bool profiling_enable_;
std::shared_ptr<GetNextProfiling> profiling_op_;
std::vector<TypeId> types_;