forked from mindspore-Ecosystem/mindspore
fix the issue of parallel executor
This commit is contained in:
parent
75af54647f
commit
95b3f4fbfe
|
@ -14,6 +14,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <utility>
|
||||
#include "src/runtime/parallel_executor.h"
|
||||
using mindspore::predict::ThreadPool;
|
||||
using mindspore::predict::TvmEnv;
|
||||
|
@ -25,25 +26,15 @@ ParallelExecutor::~ParallelExecutor() {
|
|||
}
|
||||
int ParallelExecutor::Prepare(std::vector<mindspore::kernel::LiteKernel *> &kernels) {
|
||||
pool = new ThreadPool();
|
||||
if (pool == nullptr) {
|
||||
MS_LOG(ERROR) << "Memory error: fail to new ThreadPool";
|
||||
return RET_ERROR;
|
||||
}
|
||||
pool->ConfigMaxThreadNum(MAX_THREAD_NUM);
|
||||
pool->ConfigThreadPool(NO_BIND, MAX_THREAD_NUM);
|
||||
for (mindspore::kernel::LiteKernel *kernel : kernels) {
|
||||
refCount[kernel] = kernel->out_kernels().size();
|
||||
}
|
||||
return RET_OK;
|
||||
}
|
||||
|
||||
void ParallelExecutor::PrepareReadyKernels(const std::vector<mindspore::kernel::LiteKernel *> &kernels) {
|
||||
for (auto iter = refCount.begin(); iter != refCount.end();) {
|
||||
if (iter->second == 0) {
|
||||
readyKernels.emplace_back(iter->first);
|
||||
iter = refCount.erase(iter);
|
||||
} else {
|
||||
iter++;
|
||||
}
|
||||
}
|
||||
results.resize(readyKernels.size());
|
||||
}
|
||||
|
||||
static int RunKernel(int index, TvmEnv *env, void *data) {
|
||||
ParallelExecutor *executor = reinterpret_cast<ParallelExecutor *>(data);
|
||||
auto kernel = executor->GetReadyKernel(index);
|
||||
|
@ -83,27 +74,49 @@ int ParallelExecutor::Run(std::vector<tensor::Tensor *> &in_tensors, std::vector
|
|||
}
|
||||
kernel::LiteKernelUtil::InitTensorRefCount(kernels);
|
||||
|
||||
PrepareReadyKernels(kernels);
|
||||
for (auto kernel : kernels) {
|
||||
if (kernel->in_kernels().size() == 0) {
|
||||
readyKernels.emplace_back(kernel);
|
||||
continue;
|
||||
}
|
||||
refCount[kernel] = kernel->in_kernels().size();
|
||||
}
|
||||
std::vector<kernel::LiteKernel *> newReadyKernels;
|
||||
while (readyKernels.size() > 0) {
|
||||
results.resize(readyKernels.size(), RET_OK);
|
||||
pool->LaunchWork(RunKernel, this, readyKernels.size());
|
||||
|
||||
if (std::find_if(results.begin(), results.end(), [](const int &ret) { return (ret != 0); }) != results.end()) {
|
||||
return RET_ERROR;
|
||||
}
|
||||
for (auto completedKernel : readyKernels) {
|
||||
for (auto out : completedKernel->out_kernels()) {
|
||||
newReadyKernels.clear();
|
||||
for (auto completed : readyKernels) {
|
||||
for (auto out : completed->out_kernels()) {
|
||||
auto iter = refCount.find(out);
|
||||
if (iter == refCount.end()) {
|
||||
continue;
|
||||
}
|
||||
(iter->second)--;
|
||||
if (iter->second <= 0) {
|
||||
newReadyKernels.emplace_back(iter->first);
|
||||
refCount.erase(iter);
|
||||
}
|
||||
}
|
||||
|
||||
for (auto input_kernel : completed->in_kernels()) {
|
||||
MS_ASSERT(input_kernel != nullptr);
|
||||
if (input_kernel->is_model_output()) {
|
||||
continue;
|
||||
}
|
||||
auto ret = input_kernel->DecOutTensorRefCount();
|
||||
if (0 != ret) {
|
||||
MS_LOG(WARNING) << "DecOutTensorRefCount for kernel" << completed->name() << " failed";
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
readyKernels.clear();
|
||||
PrepareReadyKernels(kernels);
|
||||
readyKernels = std::move(newReadyKernels);
|
||||
}
|
||||
|
||||
return RET_OK;
|
||||
|
|
|
@ -39,9 +39,6 @@ class ParallelExecutor : public Executor {
|
|||
inline kernel::LiteKernel *GetReadyKernel(const int index) { return readyKernels.at(index); }
|
||||
inline void SetResult(const int index, const int result) { results.at(index) = result; }
|
||||
|
||||
private:
|
||||
void PrepareReadyKernels(const std::vector<kernel::LiteKernel *> &kernels);
|
||||
|
||||
private:
|
||||
predict::ThreadPool *pool;
|
||||
std::unordered_map<kernel::LiteKernel *, size_t> refCount;
|
||||
|
|
Loading…
Reference in New Issue