!24429 optimize the OMP thread compute and actor run error info

Merge pull request !24429 from limingqi107/new_actor_runtime
This commit is contained in:
i-robot 2021-09-30 02:46:19 +00:00 committed by Gitee
commit f06311c5fa
11 changed files with 94 additions and 77 deletions

View File

@ -20,12 +20,12 @@
namespace mindspore {
namespace runtime {
void ComputeThreadNums(size_t *actor_thread_num, size_t *OMP_thread_num, size_t *max_thread_num) {
void ComputeThreadNums(size_t *actor_thread_num, size_t *actor_and_kernel_thread_num) {
MS_EXCEPTION_IF_NULL(actor_thread_num);
MS_EXCEPTION_IF_NULL(OMP_thread_num);
MS_EXCEPTION_IF_NULL(max_thread_num);
MS_EXCEPTION_IF_NULL(actor_and_kernel_thread_num);
size_t cpu_core_num = std::thread::hardware_concurrency() - 1;
const size_t kMaxThreadNum = 23;
// Compute the actor thread num.
const size_t kActorThreadMaxNum = 5;
// The MemoryManagerActor binds single thread, and the other actors share one thread at least, so the min num is 2.
const size_t kActorThreadMinNum = 2;
@ -39,12 +39,12 @@ void ComputeThreadNums(size_t *actor_thread_num, size_t *OMP_thread_num, size_t
*actor_thread_num = *actor_thread_num > kActorThreadMaxNum ? kActorThreadMaxNum : *actor_thread_num;
}
const size_t kOMPThreadMaxNum = 8;
*OMP_thread_num = cpu_core_num < kOMPThreadMaxNum ? cpu_core_num : kOMPThreadMaxNum;
*max_thread_num = cpu_core_num > *actor_thread_num ? cpu_core_num : (*actor_thread_num + 1);
if (*max_thread_num > kMaxThreadNum) {
*max_thread_num = kMaxThreadNum;
}
// Compute the actor and kernel thread num.
const size_t kActorAndKernelThreadMaxNum = 23;
*actor_and_kernel_thread_num = cpu_core_num > *actor_thread_num ? cpu_core_num : (*actor_thread_num + 1);
*actor_and_kernel_thread_num = *actor_and_kernel_thread_num > kActorAndKernelThreadMaxNum
? kActorAndKernelThreadMaxNum
: *actor_and_kernel_thread_num;
}
bool IsDeviceQueueDSActor(const AnfNodePtr &node, GraphExecutionStrategy strategy) {

View File

@ -69,14 +69,14 @@ enum class KernelTransformType {
#define SET_OPCONTEXT_FAIL_RET_WITH_ERROR(op_context, message) \
{ \
MS_LOG(ERROR) << message; \
op_context.SetFailed(kFailure); \
(op_context).error_info_ = message; \
(op_context).SetFailed(kFailure); \
return; \
}
#define SET_OPCONTEXT_SUCCESS_RET(op_context) \
{ \
op_context.SetSuccess(kSuccess); \
(op_context).SetSuccess(kSuccess); \
return; \
}
@ -85,8 +85,8 @@ enum class KernelTransformType {
if (strategy == GraphExecutionStrategy::kStep) { \
MS_LOG(EXCEPTION) << message; \
} \
MS_LOG(ERROR) << message; \
op_context.SetFailed(kFailure); \
(op_context).error_info_ = message; \
(op_context).SetFailed(kFailure); \
return; \
}
@ -98,12 +98,12 @@ enum class KernelTransformType {
if (strategy == GraphExecutionStrategy::kStep) { \
MS_LOG(EXCEPTION) << message; \
} \
MS_LOG(ERROR) << message; \
(op_context).error_info_ = message; \
(op_context).SetFailed(kFailure); \
return; \
}
void ComputeThreadNums(size_t *actor_thread_num, size_t *OMP_thread_num, size_t *max_thread_num);
void ComputeThreadNums(size_t *actor_thread_num, size_t *actor_and_kernel_thread_num);
bool IsDeviceQueueDSActor(const AnfNodePtr &node, GraphExecutionStrategy strategy = GraphExecutionStrategy::kPipeline);

View File

@ -124,6 +124,7 @@ void DataPrepareActor::Init() {
void DataPrepareActor::PrepareData(const std::vector<std::vector<TensorPtr>> &input_tensors,
OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(context);
MS_LOG(INFO) << "Data prepare actor(" << GetAID().Name() << ") prepares data.";
// Convert actor running data from input tensors.
if (input_tensors.size() > 0) {

View File

@ -51,12 +51,6 @@ class DataSourceActor : public DebugAwareActor {
// The process entry of data processing.
void FetchData(OpContext<DeviceTensor> *const context);
// The memory related operation interface.
void SendMemoryAllocReq(OpContext<DeviceTensor> *const context) override{};
void SendMemoryFreeReq(OpContext<DeviceTensor> *const context) override{};
// Copy data from data source to the device tensor buffer of actor after memory alloc finished.
void OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) override{};
protected:
friend class GraphScheduler;
@ -90,8 +84,10 @@ class DeviceQueueDataSourceActor : public DataSourceActor {
void Init() override;
// The memory related operation interface.
void SendMemoryAllocReq(OpContext<DeviceTensor> *const context) override;
void SendMemoryFreeReq(OpContext<DeviceTensor> *const context) override;
// Copy data from data source to the device tensor buffer of actor after memory alloc finished.
void OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) override;
void SendDebugReq(OpContext<DeviceTensor> *const context) override;
@ -122,8 +118,10 @@ class HostQueueDataSourceActor : public DataSourceActor {
host_queue_(host_queue) {}
~HostQueueDataSourceActor() override = default;
// The memory related operation interface.
void SendMemoryAllocReq(OpContext<DeviceTensor> *const context) override;
void SendMemoryFreeReq(OpContext<DeviceTensor> *const context) override;
// Copy data from data source to the device tensor buffer of actor after memory alloc finished.
void OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) override;
size_t FetchNodePosition(const AnfNodePtr &node) const override;

View File

@ -39,7 +39,18 @@ void SuperKernelActor::RunOpData(OpData<DeviceTensor> *const input_data, OpConte
auto &sequential_num = context->sequential_num_;
(void)input_op_datas_[sequential_num].emplace_back(input_data);
if (CheckRunningCondition(context)) {
device_contexts_[0]->LaunchGraph(graph_);
MS_LOG(INFO) << "Super kernel actor(" << GetAID().Name() << ") launches graph: " << graph_->graph_id();
try {
auto ret = device_contexts_[0]->LaunchGraph(graph_);
if (!ret) {
std::string error_info = "Launch graph failed, graph id: " + graph_->graph_id();
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
}
} catch (const std::exception &e) {
MsException::Instance().SetException();
std::string error_info = "Launch graph failed, graph id: " + graph_->graph_id();
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
}
// The input is invalid and needs to be erased when finish kernel launch.
EraseInput(context);
@ -54,7 +65,18 @@ void SuperKernelActor::RunOpControl(AID *const input_control, OpContext<DeviceTe
auto &sequential_num = context->sequential_num_;
(void)input_op_controls_[sequential_num].emplace_back(input_control);
if (CheckRunningCondition(context)) {
device_contexts_[0]->LaunchGraph(graph_);
MS_LOG(INFO) << "Super kernel actor(" << GetAID().Name() << ") launches graph: " << graph_->graph_id();
try {
auto ret = device_contexts_[0]->LaunchGraph(graph_);
if (!ret) {
std::string error_info = "Launch graph failed, graph id: " + graph_->graph_id();
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
}
} catch (const std::exception &e) {
MsException::Instance().SetException();
std::string error_info = "Launch graph failed, graph id: " + graph_->graph_id();
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
}
// The input is invalid and needs to be erased when finish kernel launch.
EraseInput(context);

View File

@ -17,26 +17,17 @@
#ifndef MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_SUPER_KERNEL_ACTOR_H_
#define MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_SUPER_KERNEL_ACTOR_H_
#include <vector>
#include <string>
#include <memory>
#include <utility>
#include <unordered_map>
#include "runtime/framework/actor/actor_common.h"
#include "runtime/framework/actor/debug_aware_actor.h"
#include "runtime/framework/actor/actor_common.h"
#include "runtime/hardware/device_context.h"
#include "runtime/framework/device_tensor_store.h"
#include "backend/kernel_compiler/kernel.h"
#include "ir/anf.h"
#include "ir/tensor.h"
namespace mindspore {
namespace runtime {
using mindspore::device::DeviceContext;
using mindspore::device::KernelInfo;
using mindspore::kernel::Address;
using mindspore::kernel::KernelLaunchInfo;
using mindspore::tensor::TensorPtr;
// The Super kernel actor is used to represent the sink executing of graph which is the combination of kernels.
class SuperKernelActor : public DebugAwareActor {

View File

@ -35,6 +35,7 @@
#endif
#ifdef ENABLE_DUMP_IR
#include "debug/rdr/recorder_manager.h"
#include "debug/rdr/running_data_recorder.h"
#endif
#ifdef ENABLE_DEBUGGER
#include "debug/debugger/debugger.h"
@ -77,6 +78,10 @@ std::vector<ActorReference> CollectActors(const ActorSet *actor_set) {
MS_EXCEPTION_IF_NULL(kernel_actor);
(void)actors.emplace_back(static_cast<ActorReference>(kernel_actor));
}
for (auto &super_kernel_actor : actor_set->super_kernel_actors_) {
MS_EXCEPTION_IF_NULL(super_kernel_actor);
(void)actors.emplace_back(static_cast<ActorReference>(super_kernel_actor));
}
for (auto &switch_actor : actor_set->switch_actors_) {
MS_EXCEPTION_IF_NULL(switch_actor);
(void)actors.emplace_back(static_cast<ActorReference>(switch_actor));
@ -216,17 +221,15 @@ void GraphScheduler::Initialize() {
// Create the thread pool of actor runtime and Set the OMP_NUM_THREADS env.
size_t actor_thread_num = 0;
size_t OMP_thread_num = 0;
size_t max_thread_num = 0;
ComputeThreadNums(&actor_thread_num, &OMP_thread_num, &max_thread_num);
size_t actor_and_kernel_thread_num = 0;
ComputeThreadNums(&actor_thread_num, &actor_and_kernel_thread_num);
auto actor_manager = ActorMgr::GetActorMgrRef();
MS_EXCEPTION_IF_NULL(actor_manager);
actor_manager->Initialize(true, actor_thread_num, max_thread_num);
std::string OMP_env = std::to_string(OMP_thread_num);
(void)common::SetEnv("OMP_NUM_THREADS", OMP_env.c_str(), 0);
actor_manager->Initialize(true, actor_thread_num, actor_and_kernel_thread_num);
(void)common::SetOMPThreadNum();
auto OMP_thread_num_used = common::GetEnv("OMP_NUM_THREADS");
MS_LOG(INFO) << "The actor thread number: " << actor_thread_num
<< ", the computed OMP thread number : " << OMP_thread_num
<< ", the kernel thread number: " << (actor_and_kernel_thread_num - actor_thread_num)
<< ", the used OMP thread number: " << OMP_thread_num_used;
BuildAndScheduleGlobalActor();
@ -315,7 +318,7 @@ void GraphScheduler::Schedule(const ActorSet *actor_set) {
}
}
bool GraphScheduler::Run(const ActorSet *actor_set, const std::vector<std::vector<TensorPtr>> &input_tensors,
void GraphScheduler::Run(const ActorSet *actor_set, const std::vector<std::vector<TensorPtr>> &input_tensors,
const std::vector<TensorPtr> &input_tensors_with_value_node, GraphExecutionStrategy strategy) {
MS_EXCEPTION_IF_NULL(actor_set);
MS_EXCEPTION_IF_NULL(actor_set->data_prepare_actor_);
@ -334,7 +337,7 @@ bool GraphScheduler::Run(const ActorSet *actor_set, const std::vector<std::vecto
actor_set->data_prepare_actor_->PrepareData(input_tensors, &op_context);
MS_EXCEPTION_IF_NULL(actor_set->kernel_actors_[0]);
actor_set->kernel_actors_[0]->RunOpControlWithInputTensor(nullptr, &op_context, &input_tensors_with_value_node);
return true;
return;
}
// Trigger data prepare actor running.
@ -344,7 +347,12 @@ bool GraphScheduler::Run(const ActorSet *actor_set, const std::vector<std::vecto
auto result_future = result[0].GetFuture();
result_future.Wait();
MsException::Instance().CheckException();
return result_future.IsOK();
if (!result_future.IsOK()) {
#ifdef ENABLE_DUMP_IR
mindspore::RDR::TriggerAll();
#endif
MS_LOG(EXCEPTION) << op_context.error_info_;
}
}
ActorSet *GraphScheduler::Fetch(const ActorInfo &actor_info) const {
@ -1883,7 +1891,7 @@ void GraphScheduler::DumpAbstractActor(const AbstractActor *actor, std::ofstream
MS_EXCEPTION_IF_NULL(result_arrow);
MS_EXCEPTION_IF_NULL(output_node);
ofs << "\t\t\tfrom_output_node:" << output_node->fullname_with_scope()
<< "tfrom_output_index:" << result_arrow->from_output_index_
<< "\tfrom_output_index:" << result_arrow->from_output_index_
<< "\tto_actor_name:" << result_arrow->to_op_id_.Name()
<< "\toutput_node_position:" << result_arrow->to_input_index_ << "\n";
}
@ -1990,7 +1998,7 @@ void GraphScheduler::DumpSuperKernelActor(const SuperKernelActor *actor, std::of
const auto &graph = actor->graph_;
MS_EXCEPTION_IF_NULL(graph);
ofs << "\t\tgraph id:" << graph->graph_id() << "\tgraphl_name:" << graph->ToString()
ofs << "\t\tgraph_id:" << graph->graph_id() << "\tgraphl_name:" << graph->ToString()
<< "\tis_sink:" << graph->is_sink() << "\tinputs_num:" << (graph->input_nodes()).size()
<< "\tkernels_num:" << (graph->execution_order()).size() << "\n";
@ -2035,7 +2043,7 @@ void GraphScheduler::DumpCopyActor(const CopyActor *actor, std::ofstream &ofs) c
void GraphScheduler::DumpDeviceTensorStore(const GraphCompilerInfo &graph_compiler_info, std::ofstream &ofs) const {
for (const auto &graph : graph_compiler_info.graphs_) {
MS_EXCEPTION_IF_NULL(graph);
ofs << "\tgraph id:" << graph->graph_id() << "\tis_sink:" << graph->is_sink() << "\n";
ofs << "\tgraph_id:" << graph->graph_id() << "\tis_sink:" << graph->is_sink() << "\n";
for (auto &value_node : graph->graph_value_nodes()) {
MS_EXCEPTION_IF_NULL(value_node);

View File

@ -105,7 +105,7 @@ class GraphScheduler {
void Schedule(const ActorSet *actor_set);
// The processing entry of actors running. The third parameter is used only in the step execution strategy.
bool Run(const ActorSet *actor_set, const std::vector<std::vector<TensorPtr>> &input_tensors,
void Run(const ActorSet *actor_set, const std::vector<std::vector<TensorPtr>> &input_tensors,
const std::vector<TensorPtr> &input_tensors_with_value_node = {},
GraphExecutionStrategy strategy = GraphExecutionStrategy::kPipeline);

View File

@ -42,9 +42,6 @@
#ifndef ENABLE_SECURITY
#include "debug/data_dump/dump_json_parser.h"
#endif
#ifdef ENABLE_DUMP_IR
#include "debug/rdr/running_data_recorder.h"
#endif
namespace mindspore {
namespace compile {
@ -846,12 +843,7 @@ void MindRTBackend::RunGraph(const ActorInfo &actor_info, const VectorRef &args,
mindspore::ScopedLongRunning long_running;
const auto &actor_set = runtime::GraphScheduler::GetInstance().Fetch(actor_info);
MS_EXCEPTION_IF_NULL(actor_set);
if (!runtime::GraphScheduler::GetInstance().Run(actor_set, input_tensors)) {
#ifdef ENABLE_DUMP_IR
mindspore::RDR::TriggerAll();
#endif
MS_LOG(EXCEPTION) << "The actor runs failed, actor name: " << actor_set->name_;
}
runtime::GraphScheduler::GetInstance().Run(actor_set, input_tensors);
if (graph_compiler_info.device_contexts_.empty()) {
MS_LOG(EXCEPTION) << "The device contexts is empty.";
@ -1057,15 +1049,6 @@ void MindRTBackend::RunGraph(const ActorInfo &actor_info, OpRunInfo *op_run_info
MS_EXCEPTION_IF_NULL(input_tensors);
MS_EXCEPTION_IF_NULL(op_run_info);
MS_EXCEPTION_IF_NULL(tensors_mask);
const auto &graph_iter = actor_to_graph_compiler_info_.find(actor_info);
if (graph_iter == actor_to_graph_compiler_info_.end()) {
MS_LOG(EXCEPTION) << "Can't find the graph compiler info.";
}
MS_EXCEPTION_IF_NULL(graph_iter->second);
const auto &graph_compiler_info = *(graph_iter->second);
const auto &actor_set = runtime::GraphScheduler::GetInstance().Fetch(actor_info);
MS_EXCEPTION_IF_NULL(actor_set);
// Erase value node tensor.
std::vector<tensor::TensorPtr> tensors_without_value_node;
@ -1086,12 +1069,19 @@ void MindRTBackend::RunGraph(const ActorInfo &actor_info, OpRunInfo *op_run_info
}
}
if (!runtime::GraphScheduler::GetInstance().Run(actor_set, {tensors_without_value_node}, *input_tensors,
runtime::GraphExecutionStrategy::kStep)) {
MS_LOG(EXCEPTION) << "The actor runs failed, actor name: " << actor_set->name_;
}
// Run actor DAG.
const auto &actor_set = runtime::GraphScheduler::GetInstance().Fetch(actor_info);
MS_EXCEPTION_IF_NULL(actor_set);
runtime::GraphScheduler::GetInstance().Run(actor_set, {tensors_without_value_node}, *input_tensors,
runtime::GraphExecutionStrategy::kStep);
// Fetch outputs.
const auto &graph_iter = actor_to_graph_compiler_info_.find(actor_info);
if (graph_iter == actor_to_graph_compiler_info_.end()) {
MS_LOG(EXCEPTION) << "Can't find the graph compiler info.";
}
MS_EXCEPTION_IF_NULL(graph_iter->second);
const auto &graph_compiler_info = *(graph_iter->second);
const auto &graph = graph_compiler_info.graphs_.front();
MS_EXCEPTION_IF_NULL(graph);
MS_EXCEPTION_IF_NULL(graph_compiler_);

View File

@ -60,6 +60,8 @@ struct OpContext {
uuids::uuid *sequential_num_;
std::vector<OpDataPtr<T>> *output_data_;
std::vector<Promise<int>> *results_;
// Record the error info for print.
std::string error_info_{""};
const void *kernel_call_back_before_;
const void *kernel_call_back_after_;

View File

@ -51,12 +51,17 @@ static inline int SetEnv(const char *envname, const char *envvar, int overwrite
}
static inline void SetOMPThreadNum() {
size_t cpu_core_num = std::thread::hardware_concurrency();
size_t cpu_core_num_half = cpu_core_num / 2;
const size_t kOMPThreadMaxNum = 16;
const size_t kOMPThreadMinNum = 1;
// The actor concurrent execution max num.
const size_t kActorConcurrentMaxNum = 4;
size_t OMP_thread_num = cpu_core_num_half < kOMPThreadMinNum ? kOMPThreadMinNum : cpu_core_num_half;
size_t cpu_core_num = std::thread::hardware_concurrency();
size_t cpu_core_num_half = cpu_core_num / 2;
// Ensure that the calculated number of OMP threads is at most half the number of CPU cores.
size_t OMP_thread_num = cpu_core_num_half / kActorConcurrentMaxNum;
OMP_thread_num = OMP_thread_num < kOMPThreadMinNum ? kOMPThreadMinNum : OMP_thread_num;
OMP_thread_num = OMP_thread_num > kOMPThreadMaxNum ? kOMPThreadMaxNum : OMP_thread_num;
std::string OMP_env = std::to_string(OMP_thread_num);