forked from mindspore-Ecosystem/mindspore
!22177 dataset: detect first batch when sending data into device
Merge pull request !22177 from ms_yan/detect_first_batch
This commit is contained in:
commit
64ad327fb8
|
@ -40,6 +40,7 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i
|
|||
total_batch_(total_batch),
|
||||
create_data_info_queue_(create_data_info_queue),
|
||||
data_info_queue_ptr_(nullptr),
|
||||
first_fetch_flag_(false),
|
||||
first_push_flag_(false) {
|
||||
#ifdef ENABLE_GPUQUE
|
||||
// Get the total device num of current machine
|
||||
|
@ -125,6 +126,8 @@ Status DeviceQueueOp::CheckExceptions(const TensorRow &row) const {
|
|||
}
|
||||
|
||||
Status DeviceQueueOp::operator()() {
|
||||
RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(
|
||||
"Detect first batch", std::bind(&DeviceQueueOp::DetectFirstBatch, this), nullptr, id()));
|
||||
TaskManager::FindMe()->Post();
|
||||
child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
|
||||
|
||||
|
@ -163,6 +166,7 @@ Status DeviceQueueOp::operator()() {
|
|||
Status DeviceQueueOp::SendDataToAscend() {
|
||||
MS_LOG(INFO) << "Device queue, sending data to Ascend.";
|
||||
uint64_t batch_start_time, end_time;
|
||||
uint64_t batch_record_start, batch_record_end;
|
||||
int64_t send_batch = 0;
|
||||
int32_t tdt_cost = 0;
|
||||
int32_t connector_size = 0;
|
||||
|
@ -185,8 +189,10 @@ Status DeviceQueueOp::SendDataToAscend() {
|
|||
md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
|
||||
md_channel_info_->RecordPreprocessBatch(0);
|
||||
#endif
|
||||
batch_record_start = ProfilingTime::GetCurMilliSecond();
|
||||
TensorRow curr_row;
|
||||
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
|
||||
first_fetch_flag_ = true;
|
||||
while (!curr_row.eof() && !is_break_loop) {
|
||||
while (!curr_row.eoe() && !is_break_loop) {
|
||||
RETURN_IF_NOT_OK(FilterMetadata(&curr_row));
|
||||
|
@ -199,9 +205,10 @@ Status DeviceQueueOp::SendDataToAscend() {
|
|||
#endif
|
||||
RETURN_IF_NOT_OK(SendRowToTdt(curr_row, isProfilingEnable, &tdt_cost));
|
||||
if (first_push_flag_ != true) {
|
||||
MS_LOG(INFO) << "Loading dataset and push first batch into device successful";
|
||||
MS_LOG(INFO) << "Loading dataset and push first batch into device successful.";
|
||||
first_push_flag_ = true;
|
||||
}
|
||||
DetectPerBatchTime(&batch_record_start, &batch_record_end);
|
||||
ProfilingRecorder(isProfilingEnable, profiling_node, send_batch, tdt_cost, &batch_start_time, &end_time,
|
||||
connector_capacity, connector_size);
|
||||
send_batch++;
|
||||
|
@ -528,8 +535,11 @@ Status DeviceQueueOp::WorkerEntry(int32_t worker_id) {
|
|||
Status DeviceQueueOp::SendDataToGPU() {
|
||||
RETURN_IF_NOT_OK(LaunchParallelCopyThread());
|
||||
MS_LOG(INFO) << "Device queue, sending data to GPU.";
|
||||
uint64_t batch_record_start, batch_record_end;
|
||||
batch_record_start = ProfilingTime::GetCurMilliSecond();
|
||||
TensorRow current_row;
|
||||
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(¤t_row));
|
||||
first_fetch_flag_ = true;
|
||||
int64_t num_buf = 0;
|
||||
bool is_break_loop = false;
|
||||
while (!current_row.eof() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
|
||||
|
@ -538,9 +548,10 @@ Status DeviceQueueOp::SendDataToGPU() {
|
|||
RETURN_IF_NOT_OK(CheckExceptions(current_row));
|
||||
RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(current_row)));
|
||||
if (first_push_flag_ != true) {
|
||||
MS_LOG(INFO) << "Loading dataset and push first batch into device successful";
|
||||
MS_LOG(INFO) << "Loading dataset and push first batch into device successful.";
|
||||
first_push_flag_ = true;
|
||||
}
|
||||
DetectPerBatchTime(&batch_record_start, &batch_record_end);
|
||||
if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
|
||||
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(¤t_row));
|
||||
} else {
|
||||
|
@ -597,6 +608,9 @@ Status DeviceQueueOp::SendDataToCPU() {
|
|||
TensorRow curr_row;
|
||||
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
|
||||
|
||||
if (!first_fetch_flag_) {
|
||||
first_fetch_flag_ = true;
|
||||
}
|
||||
if (!curr_row.empty()) {
|
||||
for (auto &tensor : curr_row) {
|
||||
MS_LOG(DEBUG) << "Feature size is " << tensor->SizeInBytes() << ".";
|
||||
|
@ -643,5 +657,37 @@ void DeviceQueueOp::ProfilingRecorder(bool isProfilingEnable, std::shared_ptr<De
|
|||
profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch + 1, connector_size, *end_time);
|
||||
}
|
||||
}
|
||||
|
||||
Status DeviceQueueOp::DetectFirstBatch() {
|
||||
TaskManager::FindMe()->Post();
|
||||
uint8_t count_num = 0;
|
||||
uint64_t temp_start_time = ProfilingTime::GetCurMilliSecond();
|
||||
while (true) {
|
||||
RETURN_IF_INTERRUPTED();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
|
||||
uint64_t temp_end_time = ProfilingTime::GetCurMilliSecond();
|
||||
// if fetch first batch, or detect 3 or more times and unable fetch first batch, exist with already printed Warning
|
||||
if (first_fetch_flag_ == true || count_num > 2) {
|
||||
break;
|
||||
} else if (temp_end_time - temp_start_time > kTimeOutMilliSeconds) {
|
||||
count_num++;
|
||||
MS_LOG(WARNING) << "Bad performance attention, it waits more than 25 seconds and unable to fetch first Batch of "
|
||||
"data from dataset pipeline, which might result `GetNext` timeout ERROR. You may test dataset "
|
||||
"processing performance and optimize it. Notes: shuffle operation is turn on for loading "
|
||||
"Dataset in default, which may effect first batch loading time.";
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void DeviceQueueOp::DetectPerBatchTime(uint64_t *start_time, uint64_t *end_time) {
|
||||
*end_time = ProfilingTime::GetCurMilliSecond();
|
||||
if (*end_time - *start_time > kTimeOutMilliSeconds) {
|
||||
MS_LOG(WARNING) << "Bad performance attention, it takes more than 25 seconds to fetch and send a batch of data"
|
||||
" into device, which might result `GetNext` timeout ERROR. You may test dataset processing"
|
||||
" performance and optimize it.";
|
||||
}
|
||||
*start_time = *end_time;
|
||||
}
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
|
|
|
@ -50,6 +50,8 @@ namespace mindspore {
|
|||
namespace dataset {
|
||||
using DATA_INFO = std::vector<std::pair<DataType, TensorShape>>;
|
||||
using DATA_INFO_QUEUE = Queue<DATA_INFO>;
|
||||
|
||||
constexpr int32_t kTimeOutMilliSeconds = 25000;
|
||||
const int kDataInfoQueueCapacity = 128;
|
||||
|
||||
class DeviceQueueOp : public PipelineOp {
|
||||
|
@ -155,6 +157,14 @@ class DeviceQueueOp : public PipelineOp {
|
|||
|
||||
Status SendDataToCPU();
|
||||
|
||||
// Create async thread to detect whether it takes too long and unable to fetch first batch
|
||||
Status DetectFirstBatch();
|
||||
|
||||
// Detect the cost time of each batch, present alarm message if cost too long
|
||||
void DetectPerBatchTime(uint64_t *start_time, uint64_t *end_time);
|
||||
|
||||
std::atomic<bool> first_fetch_flag_;
|
||||
|
||||
std::unique_ptr<ChildIterator> child_iterator_;
|
||||
std::string channel_name_;
|
||||
DeviceType device_type_;
|
||||
|
|
|
@ -211,7 +211,7 @@ class Slice(TensorOperation):
|
|||
|
||||
|
||||
class Relational(IntEnum):
|
||||
""""
|
||||
"""
|
||||
Relationship operator.
|
||||
|
||||
Possible enumeration values are: Relational.EQ, Relational.NE, Relational.GT, Relational.GE, Relational.LT,
|
||||
|
|
Loading…
Reference in New Issue