diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/data_queue_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/data_queue_op.cc index fdb85ee9439..696bde2069c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/data_queue_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/data_queue_op.cc @@ -288,30 +288,30 @@ Status DataQueueOp::SendDataToAscend() { if (ascend_data_queue_->QueueType() == "Ascend_MBUF") { // Queue control logic for mbuf in host, to prevent from hang/exit abnormally - // case 1: If mbuf queue memory + current row size < 2G then continue send, else suspend; - // case 2: Based on case 1, if elements nums in mbuf < max_queue_limit then continue send, else suspend; - // case 3: If current row size >= 1G, can only send 1 row each time, mbuf_queue_size will always in [0, 1]; + // case 1: If mbuf queue memory + next row memory < 2G then continue send, else suspend; + // case 2: Based on case 1, if element nums in mbuf < max_queue_size then continue send, else suspend; + // case 3: If row memory >= 1G, can only send 1 row each time, queue_size will always in [0, 1]; // note: - // why need queue control: acltdtSendTensor will hang when queue is full, we need to break the thread by - // ourselves what about dynamic shape: yes, memory_per_batch_ collect memory of rows in different shapes. what - // about row too large(>2G): we can promise the first row will be sent and hang in this while, but we dont - // know if the device will out of memory. If not oom, send next row, otherwise device return errors. + // why need queue control: acltdtSendTensor will hang when queue is full, need to break this thread by ourselves + // how about dynamic shape: yes, memory_per_batch_ collect memory of rows in different shapes. + // how about row too large(>2G): we can promise the first row will be sent and hang in this while, but we dont + // know if the device will out of memory. If not oom, send next row, otherwise device returns error. // Calculate the memory of next row before sending - size_t mbuf_queue_size = ascend_data_queue_->QueryQueueSize(); - double row_size = curr_row.SizeInBytes() / 1024. / 1024. / 1024.; - memory_per_batch_.push_back(row_size); + size_t queue_size = ascend_data_queue_->QueryQueueSize(); + double row_memory = curr_row.SizeInBytes() / 1024. / 1024. / 1024.; + memory_per_batch_.push_back(row_memory); - const double max_row_size = 2.; - const size_t max_queue_limit = 100; + const double max_queue_memory = 2.; + const size_t max_queue_size = 100; const int64_t send_interval = 1000; - while ((row_size + CalMbufQueueMemory(mbuf_queue_size) >= max_row_size || mbuf_queue_size >= max_queue_limit) && - mbuf_queue_size != 0) { + while ((row_memory + CalMbufQueueMemory(queue_size) >= max_queue_memory || queue_size >= max_queue_size) && + queue_size != 0) { RETURN_IF_INTERRUPTED(); - MS_LOG(INFO) << "Mbuf queue size: " << mbuf_queue_size << ", max queue limit: " << max_queue_limit << ". " - << "Next row memory: " << row_size << ", Mbuf memory: " << CalMbufQueueMemory(mbuf_queue_size); + MS_LOG(DEBUG) << "Mbuf queue size: " << queue_size << ", max queue limit: " << max_queue_size << ". " + << "Next row memory: " << row_memory << ", Mbuf memory: " << CalMbufQueueMemory(queue_size); - mbuf_queue_size = ascend_data_queue_->QueryQueueSize(); + queue_size = ascend_data_queue_->QueryQueueSize(); std::this_thread::sleep_for(std::chrono::microseconds(send_interval)); } }