forked from mindspore-Ecosystem/mindspore
!1025 [Data]fix thread hang when a large number of threads have been opened
Merge pull request !1025 from xulei/filter_op
This commit is contained in:
commit
73e123aede
|
@ -63,9 +63,10 @@ Status FilterOp::operator()() {
|
|||
RETURN_UNEXPECTED_IF_NULL(tree_);
|
||||
filter_queues_.Init(num_workers_, oc_queue_size_);
|
||||
RETURN_IF_NOT_OK(filter_queues_.Register(tree_->AllTasks()));
|
||||
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&FilterOp::WorkerEntry, this, std::placeholders::_1)));
|
||||
Status rc = tree_->LaunchWorkers(num_workers_, std::bind(&FilterOp::WorkerEntry, this, std::placeholders::_1));
|
||||
// Synchronize with TaskManager.
|
||||
TaskManager::FindMe()->Post();
|
||||
RETURN_IF_NOT_OK(rc);
|
||||
RETURN_IF_NOT_OK(Collector());
|
||||
return Status::OK();
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ class DbConnector : public Connector<std::unique_ptr<DataBuffer>> {
|
|||
"[ERROR] nullptr detected when getting data from db connector");
|
||||
} else {
|
||||
std::unique_lock<std::mutex> lk(m_);
|
||||
RETURN_IF_NOT_OK(cv_.Wait(&lk, [this, worker_id]() { return expect_consumer_ == worker_id; }));
|
||||
RETURN_IF_NOT_OK(cv_.Wait(&lk, [this, worker_id]() { return (expect_consumer_ == worker_id) || end_of_file_; }));
|
||||
// Once an EOF message is encountered this flag will be set and we can return early.
|
||||
if (end_of_file_) {
|
||||
*result = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF);
|
||||
|
|
Loading…
Reference in New Issue