added task executor

This commit is contained in:
chendongsheng 2021-04-26 15:14:13 +08:00
parent 81cd26bdc8
commit 3c5bf70a13
3 changed files with 194 additions and 0 deletions

View File

@ -76,6 +76,10 @@ constexpr uint32_t kMaxMessageSize = static_cast<uint32_t>(100 * (uint32_t(1) <<
constexpr char kServerNum[] = "server_num";
constexpr char kWorkerNum[] = "worker_num";
constexpr int64_t kSubmitTaskInterval = 1;
constexpr int64_t kMaxTaskNum = 1024;
constexpr int64_t kSubmitTimeOut = 3000;
using DataPtr = std::shared_ptr<unsigned char[]>;
using VectorPtr = std::shared_ptr<std::vector<unsigned char>>;
using Key = uint64_t;

View File

@ -0,0 +1,88 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ps/core/communicator/task_executor.h"
namespace mindspore {
namespace ps {
namespace core {
TaskExecutor::TaskExecutor(size_t thread_num, size_t max_task_num, size_t submit_timeout)
: running_(true),
thread_num_(thread_num),
idle_thread_num_(0),
submit_timeout_(submit_timeout),
max_task_num_(max_task_num),
task_num_(0) {
for (size_t i = 0; i < thread_num; i++) {
working_threads_.emplace_back([this]() {
std::function<void()> task;
while (true) {
std::unique_lock<std::mutex> lock(mtx_);
// Idle thread number increases when the mtx_ is locked.
idle_thread_num_++;
if (!running_) {
// To avoid thread from blocking after destructor.
return;
}
cv_.wait(lock);
if (!running_ || task_queue_.empty()) {
return;
}
task = task_queue_.front();
task_queue_.pop();
if (lock.owns_lock()) {
lock.unlock();
}
task();
}
});
}
notify_thread_ = std::thread([this]() {
// If there is no idle thread, wait until the working thread is available.
while (true) {
{
std::unique_lock<std::mutex> lock(mtx_);
if (idle_thread_num_ > 0 && task_num_ > 0) {
idle_thread_num_--;
task_num_--;
lock.unlock();
cv_.notify_one();
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(kSubmitTaskInterval));
}
});
notify_thread_.detach();
}
TaskExecutor::~TaskExecutor() {
{
std::unique_lock<std::mutex> lock(mtx_);
running_ = false;
}
cv_.notify_all();
for (auto &t : working_threads_) {
t.join();
}
}
} // namespace core
} // namespace ps
} // namespace mindspore

View File

@ -0,0 +1,102 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TASK_EXECUTOR_H_
#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TASK_EXECUTOR_H_
#include <functional>
#include <queue>
#include <mutex>
#include <vector>
#include <thread>
#include <condition_variable>
#include "utils/log_adapter.h"
#include "ps/constants.h"
namespace mindspore {
namespace ps {
namespace core {
/* This class can submit tasks in multiple threads
* example:
* void TestTaskExecutor() {
* std::cout << "Execute in one thread";
* }
*
* TaskExecutor executor(10); // 10 threads
* executor.Submit(TestTaskExecutor, this); // Submit task
*/
class TaskExecutor {
public:
explicit TaskExecutor(size_t thread_num, size_t max_task_num = kMaxTaskNum, size_t submit_timeout = kSubmitTimeOut);
~TaskExecutor();
// If the number of submitted tasks is greater than the size of the queue, it will block the submission of subsequent
// tasks unitl timeout.
template <typename Fun, typename... Args>
bool Submit(Fun &&function, Args &&... args) {
auto callee = std::bind(function, args...);
std::function<void()> task = [callee]() -> void { callee(); };
auto index = 0;
for (size_t i = 0; i < submit_timeout_; i++) {
std::unique_lock<std::mutex> lock(mtx_);
if (task_num_ >= max_task_num_) {
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(kSubmitTaskInterval));
index++;
} else {
break;
}
}
if (index >= submit_timeout_) {
MS_LOG(WARNING) << "Submit task failed after " << submit_timeout_ << " ms.";
return false;
}
std::unique_lock<std::mutex> lock(mtx_);
task_num_++;
task_queue_.push(task);
return true;
}
private:
bool running_;
// The number of tasks actually running
size_t thread_num_;
// The number of idle threads that can execute tasks
size_t idle_thread_num_;
// The timeout period of the task submission, in milliseconds. default timeout is 3000 milliseconds.
size_t submit_timeout_;
// The maximum number of tasks that can be submitted to the task queue, If the number of submitted tasks exceeds this
// max_task_num_, the Submit function will block.Until the current number of tasks is less than max task num,or
// timeout.
size_t max_task_num_;
// The number of currently submitted to the task queue
size_t task_num_;
std::thread notify_thread_;
std::mutex mtx_;
std::condition_variable cv_;
std::vector<std::thread> working_threads_;
std::queue<std::function<void()>> task_queue_;
};
} // namespace core
} // namespace ps
} // namespace mindspore
#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TASK_EXECUTOR_H_