From 9b8c19d9f1fc54964104d69e459fd1e81a121936 Mon Sep 17 00:00:00 2001 From: xiefangqi Date: Mon, 7 Sep 2020 20:13:31 +0800 Subject: [PATCH] fix dataset thread register problem --- .../minddata/dataset/engine/datasetops/source/clue_op.cc | 5 ++++- .../minddata/dataset/engine/datasetops/source/csv_op.cc | 5 ++++- .../dataset/engine/datasetops/source/text_file_op.cc | 6 ++++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.cc index 3a63ecb866f..3ba902ceae7 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.cc @@ -221,6 +221,10 @@ Status ClueOp::LoadFile(const std::string &file, const int64_t start_offset, con Status ClueOp::operator()() { RETURN_IF_NOT_OK(CalculateNumRowsPerShard()); + // Move register to the front of launching thread, this will fix the problem + // when thread exit unnormally register will failed occasionally. + RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); + // launch one thread, responsible for filling IoBlockQueue RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&ClueOp::WaitToFillIOBlockQueue, this))); @@ -228,7 +232,6 @@ Status ClueOp::operator()() { // must be called after launching workers. TaskManager::FindMe()->Post(); - RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); NotifyToFillIOBlockQueue(); while (!finished_reading_dataset_) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.cc index 6cff81c1b81..14f4042f4d9 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.cc @@ -518,6 +518,10 @@ Status CsvOp::LoadFile(const std::string &file, const int64_t start_offset, cons Status CsvOp::operator()() { RETURN_IF_NOT_OK(CalculateNumRowsPerShard()); + // Move register to the front of launching thread, this will fix the problem + // when thread exit unnormally register will failed occasionally. + RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); + // launch one thread, responsible for filling IoBlockQueue RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&CsvOp::WaitToFillIOBlockQueue, this))); @@ -525,7 +529,6 @@ Status CsvOp::operator()() { // must be called after launching workers. TaskManager::FindMe()->Post(); - RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); NotifyToFillIOBlockQueue(); while (!finished_reading_dataset_) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/text_file_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/text_file_op.cc index f069139859c..4f262334003 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/text_file_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/text_file_op.cc @@ -378,6 +378,10 @@ void TextFileOp::NotifyToFillIOBlockQueue() { io_block_queue_wait_post_.Set(); } Status TextFileOp::operator()() { RETURN_IF_NOT_OK(CalculateNumRowsPerShard()); + // Move register to the front of launching thread, this will fix the problem + // when thread exit unnormally register will failed occasionally. + RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); + // launch one thread, responsible for filling IoBlockQueue RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&TextFileOp::WaitToFillIOBlockQueue, this))); @@ -387,8 +391,6 @@ Status TextFileOp::operator()() { // must be called after launching workers. TaskManager::FindMe()->Post(); - - RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); NotifyToFillIOBlockQueue(); while (!finished_reading_dataset_) { int64_t buffer_id = 0;