!49060 update description of mbuf control logic

Merge pull request !49060 from luoyang/log
This commit is contained in:
i-robot 2023-02-18 02:28:12 +00:00 committed by Gitee
commit f6946cafab
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
1 changed files with 17 additions and 17 deletions

View File

@ -288,30 +288,30 @@ Status DataQueueOp::SendDataToAscend() {
if (ascend_data_queue_->QueueType() == "Ascend_MBUF") {
// Queue control logic for mbuf in host, to prevent from hang/exit abnormally
// case 1: If mbuf queue memory + current row size < 2G then continue send, else suspend;
// case 2: Based on case 1, if elements nums in mbuf < max_queue_limit then continue send, else suspend;
// case 3: If current row size >= 1G, can only send 1 row each time, mbuf_queue_size will always in [0, 1];
// case 1: If mbuf queue memory + next row memory < 2G then continue send, else suspend;
// case 2: Based on case 1, if element nums in mbuf < max_queue_size then continue send, else suspend;
// case 3: If row memory >= 1G, can only send 1 row each time, queue_size will always in [0, 1];
// note:
// why need queue control: acltdtSendTensor will hang when queue is full, we need to break the thread by
// ourselves what about dynamic shape: yes, memory_per_batch_ collect memory of rows in different shapes. what
// about row too large(>2G): we can promise the first row will be sent and hang in this while, but we dont
// know if the device will out of memory. If not oom, send next row, otherwise device return errors.
// why need queue control: acltdtSendTensor will hang when queue is full, need to break this thread by ourselves
// how about dynamic shape: yes, memory_per_batch_ collect memory of rows in different shapes.
// how about row too large(>2G): we can promise the first row will be sent and hang in this while, but we dont
// know if the device will out of memory. If not oom, send next row, otherwise device returns error.
// Calculate the memory of next row before sending
size_t mbuf_queue_size = ascend_data_queue_->QueryQueueSize();
double row_size = curr_row.SizeInBytes() / 1024. / 1024. / 1024.;
memory_per_batch_.push_back(row_size);
size_t queue_size = ascend_data_queue_->QueryQueueSize();
double row_memory = curr_row.SizeInBytes() / 1024. / 1024. / 1024.;
memory_per_batch_.push_back(row_memory);
const double max_row_size = 2.;
const size_t max_queue_limit = 100;
const double max_queue_memory = 2.;
const size_t max_queue_size = 100;
const int64_t send_interval = 1000;
while ((row_size + CalMbufQueueMemory(mbuf_queue_size) >= max_row_size || mbuf_queue_size >= max_queue_limit) &&
mbuf_queue_size != 0) {
while ((row_memory + CalMbufQueueMemory(queue_size) >= max_queue_memory || queue_size >= max_queue_size) &&
queue_size != 0) {
RETURN_IF_INTERRUPTED();
MS_LOG(INFO) << "Mbuf queue size: " << mbuf_queue_size << ", max queue limit: " << max_queue_limit << ". "
<< "Next row memory: " << row_size << ", Mbuf memory: " << CalMbufQueueMemory(mbuf_queue_size);
MS_LOG(DEBUG) << "Mbuf queue size: " << queue_size << ", max queue limit: " << max_queue_size << ". "
<< "Next row memory: " << row_memory << ", Mbuf memory: " << CalMbufQueueMemory(queue_size);
mbuf_queue_size = ascend_data_queue_->QueryQueueSize();
queue_size = ascend_data_queue_->QueryQueueSize();
std::this_thread::sleep_for(std::chrono::microseconds(send_interval));
}
}