[MSLITE][DEVELOP] fix bug of thread pool

This commit is contained in:
yangruoqi713 2021-09-17 17:40:49 +08:00
parent a7f17d5c8e
commit ae3408882f
6 changed files with 60 additions and 14 deletions

View File

@ -67,6 +67,10 @@ void ActorMgr::Initialize(bool use_inner_pool, size_t actor_thread_num, size_t m
inner_pool_->DisableOccupiedActorThread();
inner_pool_->SetKernelThreadNum(max_thread_num - actor_thread_num);
}
if (inner_pool_ != nullptr) {
inner_pool_->SetMaxSpinCount(kDefaultSpinCount);
inner_pool_->SetSpinCountMaxValue();
}
}
}
@ -253,7 +257,6 @@ void ActorMgr::Terminate(const AID &id) {
std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageBase("Terminate", MessageBase::Type::KTERMINATE));
MINDRT_OOM_EXIT(msg);
(void)actor->EnqueMessage(std::move(msg));
actor->SetRunningStatus(true);
// Wait actor's thread to finish.
actor->Await();

View File

@ -41,7 +41,7 @@ void ActorWorker::RunWithSpin() {
} else {
YieldAndDeactive();
}
if (spin_count_ >= max_spin_count_) {
if (spin_count_ > max_spin_count_) {
WaitUntilActive();
spin_count_ = 0;
}
@ -58,11 +58,15 @@ bool ActorWorker::RunQueueActorTask() {
return true;
}
bool ActorWorker::Active() {
bool ActorWorker::ActorActive() {
if (status_ != kThreadIdle) {
return false;
}
{
std::lock_guard<std::mutex> _l(mutex_);
active_num_++;
status_ = kThreadBusy;
}
cond_var_.notify_one();
return true;
}
@ -70,6 +74,7 @@ bool ActorWorker::Active() {
ActorThreadPool::~ActorThreadPool() {
// wait until actor queue is empty
bool terminate = false;
int count = 0;
do {
{
#ifdef USE_HQUEUE
@ -80,9 +85,12 @@ ActorThreadPool::~ActorThreadPool() {
#endif
}
if (!terminate) {
for (auto &worker : workers_) {
worker->Active();
}
std::this_thread::yield();
}
} while (!terminate);
} while (!terminate && count++ < kMaxCount);
for (auto &worker : workers_) {
delete worker;
worker = nullptr;
@ -124,7 +132,7 @@ void ActorThreadPool::PushActorToQueue(ActorBase *actor) {
// active one idle actor thread if exist
for (size_t i = 0; i < actor_thread_num_; ++i) {
auto worker = reinterpret_cast<ActorWorker *>(workers_[i]);
if (worker->Active()) {
if (worker->ActorActive()) {
break;
}
}

View File

@ -33,7 +33,7 @@ class ActorThreadPool;
class ActorWorker : public Worker {
public:
void CreateThread(ActorThreadPool *pool);
bool Active();
bool ActorActive();
private:
void RunWithSpin();

View File

@ -66,7 +66,7 @@ void Worker::Run() {
} else {
YieldAndDeactive();
}
if (spin_count_ >= max_spin_count_) {
if (spin_count_ > max_spin_count_) {
WaitUntilActive();
spin_count_ = 0;
}
@ -96,7 +96,8 @@ void Worker::YieldAndDeactive() {
void Worker::WaitUntilActive() {
std::unique_lock<std::mutex> _l(mutex_);
cond_var_.wait(_l, [&] { return status_ == kThreadBusy || !alive_; });
cond_var_.wait(_l, [&] { return status_ == kThreadBusy || active_num_ > 0 || !alive_; });
active_num_--;
}
void Worker::set_scale(float lhs_scale, float rhs_scale) {
@ -114,6 +115,15 @@ void Worker::Active(Task *task, int task_id) {
cond_var_.notify_one();
}
void Worker::Active() {
{
std::lock_guard<std::mutex> _l(mutex_);
active_num_++;
status_ = kThreadBusy;
}
cond_var_.notify_one();
}
bool Worker::available() {
int expected = kThreadIdle;
return status_.compare_exchange_strong(expected, kThreadHeld);
@ -262,6 +272,12 @@ void ThreadPool::ActiveWorkers(const std::vector<Worker *> &workers, Task *task,
}
}
void ThreadPool::ActiveWorkers() const {
for (auto &worker : workers_) {
worker->Active();
}
}
Worker *ThreadPool::CurrentWorker() const {
for (const auto &worker : workers_) {
if (worker->thread_id() == std::this_thread::get_id()) {
@ -332,6 +348,20 @@ void ThreadPool::SetSpinCountMinValue() {
return;
}
void ThreadPool::SetMaxSpinCount(int spin_count) {
if (spin_count <= 0) {
return;
}
max_spin_count_ = spin_count;
}
void ThreadPool::SetMinSpinCount(int spin_count) {
if (spin_count <= 0) {
return;
}
min_spin_count_ = spin_count;
}
ThreadPool *ThreadPool::CreateThreadPool(size_t thread_num, const std::vector<int> &core_list) {
ThreadPool *pool = new (std::nothrow) ThreadPool();
if (pool == nullptr) {

View File

@ -30,7 +30,8 @@
namespace mindspore {
constexpr int kDefaultSpinCount = 300000;
constexpr int kMinSpinCount = 3000;
constexpr int kMaxCount = 30000;
constexpr int kMinSpinCount = 1;
constexpr int kDefaultFrequency = 1;
constexpr float kMaxScale = 1.;
@ -61,6 +62,8 @@ class Worker {
void CreateThread();
// assign task and then activate thread
void Active(Task *task, int task_id);
// activate thread
void Active();
// whether or not it is idle and marked as held
bool available();
// assigns task first before running
@ -93,6 +96,7 @@ class Worker {
cpu_set_t mask_;
#endif
std::atomic_int status_{kThreadBusy};
std::atomic_int active_num_{0};
std::mutex mutex_;
std::condition_variable cond_var_;
@ -103,7 +107,7 @@ class Worker {
float rhs_scale_{kMaxScale};
int frequency_{kDefaultFrequency};
int spin_count_{0};
int max_spin_count_{kDefaultSpinCount};
int max_spin_count_{kMinSpinCount};
};
class ThreadPool {
@ -124,8 +128,9 @@ class ThreadPool {
size_t GetKernelThreadNum() const { return kernel_thread_num_; }
void SetSpinCountMaxValue();
void SetSpinCountMinValue();
void SetMaxSpinCount(int spin_count) { max_spin_count_ = spin_count; }
void SetMinSpinCount(int spin_count) { min_spin_count_ = spin_count; }
void SetMaxSpinCount(int spin_count);
void SetMinSpinCount(int spin_count);
void ActiveWorkers() const;
protected:
ThreadPool() = default;

View File

@ -61,7 +61,7 @@ ml_ei_facedetection.onnx 2
#ml_video_edit_art_generate.onnx #mul operator overflows, not suitable for fp16
#ml_voice_detect.onnx #conv operator overflows, not suitable for fp16
#ml_location_lane_counter.onnx has very small values during op computation (<1e-6), which causes the precision variation
ml_location_lane_counter.onnx 7.5
ml_location_lane_counter.onnx 8
ml_location_lane_counter0.onnx 1.0
#The encoder an decoder model are used in ml_asr scene, both have value overflow. Not suitable for fp16.
#But added for guarding process.