!41007 kernel actor support the somas for intergration of dynamic and static memory

Merge pull request !41007 from limingqi107/new_actor_runtime
This commit is contained in:
i-robot 2022-08-29 01:24:43 +00:00 committed by Gitee
commit 0d6581c54f
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
9 changed files with 259 additions and 87 deletions

View File

@ -375,7 +375,6 @@ bool AscendKernelExecutor::LaunchKernel(const CNodePtr &kernel, const vector<Add
auto device_id = ms_context->get_param<uint32_t>(MS_CTX_DEVICE_ID);
KernelType kernel_type = AnfAlgo::GetKernelType(kernel);
MS_EXCEPTION_IF_NULL(kernel);
MS_LOG(DEBUG) << "Launch kernel: " << kernel->fullname_with_scope();
(void)res_manager_->BindDeviceToCurrentThread();
std::vector<AddressPtr> real_inputs;
@ -407,7 +406,6 @@ bool AscendKernelExecutor::LaunchKernel(const CNodePtr &kernel, const vector<Add
if (nop_op_to_memcpy_.find(kernel) != nop_op_to_memcpy_.end()) {
(void)MemoryCopyAsync(kernel, real_inputs, outputs);
} else {
MS_LOG(DEBUG) << "Launch kernel " << kernel->fullname_with_scope();
#ifndef ENABLE_SECURITY
auto profiler_inst = profiler::ascend::PynativeProfiler::GetInstance();
MS_EXCEPTION_IF_NULL(profiler_inst);
@ -415,7 +413,9 @@ bool AscendKernelExecutor::LaunchKernel(const CNodePtr &kernel, const vector<Add
profiler_inst->OpDataProducerBegin(res_manager_->runtime_instance_, stream, t_id, kernel->fullname_with_scope(),
is_dynamic_shape);
#endif
MS_LOG(DEBUG) << "Begin launch kernel: " << kernel->fullname_with_scope();
ret = kernel_mod->Launch(real_inputs, workspace, outputs, stream);
MS_LOG(DEBUG) << "End launch kernel: " << kernel->fullname_with_scope();
#ifndef ENABLE_SECURITY
profiler_inst->OpDataProducerEnd(t_id, is_dynamic_shape);
#endif

View File

@ -340,7 +340,6 @@ bool CPUKernelExecutor::LaunchKernel(const CNodePtr &kernel, const std::vector<A
const std::vector<AddressPtr> &workspace, const std::vector<AddressPtr> &outputs,
size_t /* stream_id */) const {
MS_EXCEPTION_IF_NULL(kernel);
MS_LOG(DEBUG) << "Launch kernel: " << kernel->fullname_with_scope();
auto kernel_mod = AnfAlgo::GetKernelMod(kernel);
MS_EXCEPTION_IF_NULL(kernel_mod);
@ -355,10 +354,16 @@ bool CPUKernelExecutor::LaunchKernel(const CNodePtr &kernel, const std::vector<A
const auto &profiler_inst = profiler::cpu::CPUProfiler::GetInstance();
MS_EXCEPTION_IF_NULL(profiler_inst);
if (profiler_inst->GetEnableFlag()) {
return LaunchKernelWithProfiling(kernel, inputs, workspace, outputs);
MS_LOG(DEBUG) << "Begin launch kernel: " << kernel->fullname_with_scope();
auto ret = LaunchKernelWithProfiling(kernel, inputs, workspace, outputs);
MS_LOG(DEBUG) << "End launch kernel: " << kernel->fullname_with_scope();
return ret;
}
#endif
return DoLaunchKernel(kernel_mod, inputs, workspace, outputs);
MS_LOG(DEBUG) << "Begin launch kernel: " << kernel->fullname_with_scope();
auto ret = DoLaunchKernel(kernel_mod, inputs, workspace, outputs);
MS_LOG(DEBUG) << "End launch kernel: " << kernel->fullname_with_scope();
return ret;
}
bool CPUDeviceResManager::LoadCollectiveCommLib() {

View File

@ -599,13 +599,15 @@ bool GPUKernelExecutor::LaunchKernel(const CNodePtr &kernel, const std::vector<A
if (!profiler_inst->GetEnableFlag()) {
#endif
auto lock = LockLaunchKernel(stream);
MS_LOG(DEBUG) << "Launch kernel: " << kernel->fullname_with_scope();
MS_LOG(DEBUG) << "Begin launch kernel: " << kernel->fullname_with_scope();
ret = DoLaunchKernel(kernel, inputs, workspace, outputs, stream);
MS_LOG(DEBUG) << "End launch kernel: " << kernel->fullname_with_scope();
#ifndef ENABLE_SECURITY
} else {
auto lock = LockLaunchKernel(stream);
MS_LOG(DEBUG) << "Launch kernel: " << kernel->fullname_with_scope();
MS_LOG(DEBUG) << "Begin launch kernel: " << kernel->fullname_with_scope();
ret = LaunchKernelWithProfiling(kernel, inputs, workspace, outputs, stream);
MS_LOG(DEBUG) << "End launch kernel: " << kernel->fullname_with_scope();
}
#endif
if (!ret) {

View File

@ -53,11 +53,11 @@ uint8_t *CommonSomasAllocator::GetNodeOutputPtr(const AnfNodePtr &node, size_t i
MS_EXCEPTION_IF_NULL(node);
auto kernel_info = dynamic_cast<KernelInfo *>(node->kernel_info());
MS_EXCEPTION_IF_NULL(kernel_info);
if (index >= kernel_info->somas_output_offset_aligned_size_list().size()) {
if (index >= kernel_info->somas_output_result().size()) {
MS_LOG(EXCEPTION) << "index:[" << index << "] is larger than it's output size:["
<< kernel_info->somas_output_offset_aligned_size_list().size() << "]";
<< kernel_info->somas_output_result().size() << "]";
}
auto somas_offset_aligned_size = kernel_info->somas_output_offset_aligned_size_list()[index];
auto somas_offset_aligned_size = kernel_info->somas_output_result()[index];
if (somas_offset_aligned_size.second == 0) {
return nullptr;
}
@ -70,11 +70,11 @@ uint8_t *CommonSomasAllocator::GetNodeWorkSpacePtr(const AnfNodePtr &node, size_
MS_EXCEPTION_IF_NULL(node);
auto kernel_info = dynamic_cast<KernelInfo *>(node->kernel_info());
MS_EXCEPTION_IF_NULL(kernel_info);
if (index >= kernel_info->somas_workspace_offset_aligned_size_list().size()) {
if (index >= kernel_info->somas_workspace_result().size()) {
MS_LOG(EXCEPTION) << "index:[" << index << "] is larger than it's output size:["
<< kernel_info->somas_workspace_offset_aligned_size_list().size() << "]";
<< kernel_info->somas_workspace_result().size() << "]";
}
auto somas_offset_aligned_size = kernel_info->somas_workspace_offset_aligned_size_list()[index];
auto somas_offset_aligned_size = kernel_info->somas_workspace_result()[index];
if (somas_offset_aligned_size.second == 0) {
return nullptr;
}

View File

@ -16,6 +16,7 @@
#include "runtime/device/kernel_info.h"
#include <utility>
#include "utils/ms_context.h"
namespace mindspore {
namespace device {
@ -116,6 +117,25 @@ bool KernelInfo::SetSomasResult(std::vector<std::pair<size_t, size_t>> &&output_
return true;
}
bool KernelInfo::IsTensorEnableSomas(const std::vector<std::pair<size_t, size_t>> &somas_result,
size_t tensor_index) const {
// Somas is currently closed in the runtime.
auto ms_context = MsContext::GetInstance();
MS_EXCEPTION_IF_NULL(ms_context);
if ((ms_context->get_param<int>(MS_CTX_MEMORY_OPTIMIZE_LEVEL) == kOptimizeO0) ||
(ms_context->get_param<int>(MS_CTX_MEMORY_OPTIMIZE_LEVEL) == kOptimizeO1)) {
return false;
}
if (somas_result.empty()) {
return false;
}
if (tensor_index >= somas_result.size()) {
MS_LOG(EXCEPTION) << "The tensor index:" << tensor_index << " is out of range:" << somas_result.size();
}
return (somas_result[tensor_index].second != 0);
}
void KernelInfo::set_kernel_mod(const kernel::KernelModPtr &kernel_mod) { kernel_mod_ = kernel_mod; }
kernel::KernelMod *KernelInfo::MutableKernelMod() const { return kernel_mod_.get(); }

View File

@ -58,8 +58,6 @@ class KernelInfo : public KernelInfoDevice {
DeviceAddressPtr GetMutableWorkspaceAddr(size_t index) const;
bool WorkspaceAddrExist(size_t index) const;
bool SetWorkspaceAddr(const DeviceAddressPtr &output_address, size_t index);
bool SetSomasResult(std::vector<std::pair<size_t, size_t>> &&output_somas_result,
std::vector<std::pair<size_t, size_t>> &&workspace_somas_result);
void set_kernel_mod(const kernel::KernelModPtr &kernel_mod);
kernel::KernelMod *MutableKernelMod() const;
const kernel::KernelMod *kernel_mod() const;
@ -73,12 +71,7 @@ class KernelInfo : public KernelInfoDevice {
uint32_t graph_id() const { return graph_id_; }
bool operator==(const KernelInfo &other) const;
bool is_feature_map() const { return is_feature_map_; }
const std::vector<std::pair<size_t, size_t>> &somas_output_offset_aligned_size_list() const {
return somas_output_result_;
}
const std::vector<std::pair<size_t, size_t>> &somas_workspace_offset_aligned_size_list() const {
return somas_workspace_result_;
}
const std::vector<std::shared_ptr<DeviceAddress>> &output_address_list() const { return output_address_list_; }
const std::vector<std::shared_ptr<DeviceAddress>> &workspace_address_list() const { return workspace_address_list_; }
@ -87,6 +80,13 @@ class KernelInfo : public KernelInfoDevice {
void set_ref_map(const bool &all_ref, const OutputInputRefMap &ref_map);
const OutputInputRefMap &out_in_ref_map() const { return out_in_ref_map_; }
// The interface of somas.
bool SetSomasResult(std::vector<std::pair<size_t, size_t>> &&output_somas_result,
std::vector<std::pair<size_t, size_t>> &&workspace_somas_result);
bool IsTensorEnableSomas(const std::vector<std::pair<size_t, size_t>> &somas_result, size_t tensor_index) const;
const std::vector<std::pair<size_t, size_t>> &somas_output_result() const { return somas_output_result_; }
const std::vector<std::pair<size_t, size_t>> &somas_workspace_result() const { return somas_workspace_result_; }
private:
bool is_feature_map_;
kernel::KernelBuildInfoPtr select_kernel_build_info_;

View File

@ -205,17 +205,31 @@ void DumpKernelActor(const KernelActor *actor, std::ofstream &ofs) {
const auto &kernel = actor->kernel();
MS_EXCEPTION_IF_NULL(kernel);
auto kernel_info = dynamic_cast<KernelInfo *>(kernel->kernel_info());
MS_EXCEPTION_IF_NULL(kernel_info);
ofs << "\t\tkernel_name:" << kernel->fullname_with_scope()
<< "\tinputs_num:" << common::AnfAlgo::GetInputTensorNum(kernel)
<< "\toutputs_num:" << common::AnfAlgo::GetOutputTensorNum(kernel)
<< "\tis_dynamic_shape:" << actor->is_dynamic_shape() << "\tis_launch_skipped:" << actor->is_launch_skipped()
<< "\n";
const auto &somas_outputs = kernel_info->somas_output_result();
for (size_t i = 0; i < common::AnfAlgo::GetOutputTensorNum(kernel); ++i) {
const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(kernel, i, false);
MS_EXCEPTION_IF_NULL(device_tensor);
ofs << "\t\t\toutput_index:" << i << "\tptr:" << device_tensor->GetPtr() << "\tsize:" << device_tensor->GetSize()
<< "\toriginal_ref_count:" << device_tensor->original_ref_count()
<< "\tdynamic_ref_count:" << device_tensor->dynamic_ref_count() << "\n ";
<< "\tdynamic_ref_count:" << device_tensor->dynamic_ref_count()
<< "\tis_somas_enable:" << kernel_info->IsTensorEnableSomas(somas_outputs, i) << "\n ";
}
const auto &somas_workspace = kernel_info->somas_workspace_result();
const auto &workspace_addresses = kernel_info->workspace_address_list();
for (size_t i = 0; i < workspace_addresses.size(); ++i) {
auto &device_tensor = workspace_addresses[i];
MS_EXCEPTION_IF_NULL(device_tensor);
ofs << "\t\t\tworkspace_index:" << i << "\tptr:" << device_tensor->GetPtr() << "\tsize:" << device_tensor->GetSize()
<< "\toriginal_ref_count:" << device_tensor->original_ref_count()
<< "\tdynamic_ref_count:" << device_tensor->dynamic_ref_count()
<< "\tis_somas_enable:" << kernel_info->IsTensorEnableSomas(somas_workspace, i) << "\n ";
}
DumpAbstractActor(actor, ofs);
@ -232,6 +246,18 @@ void DumpKernelActor(const KernelActor *actor, std::ofstream &ofs) {
ofs << "\t\t\tmodifiable_ref_output_index:" << ref_output_index << "\n ";
}
}
const int32_t kInvalidPosition = -1;
if (actor->memory_alloc_insert_position().first != kInvalidPosition) {
std::string arrow_type = actor->memory_alloc_insert_position().second ? "data_arrow" : "control_arrow";
ofs << "\t\tmemory_alloc_insert_position:" << actor->memory_alloc_insert_position().first
<< "\tinsert_arrow_type:" << arrow_type << "\n";
}
if (actor->memory_free_insert_position().first != kInvalidPosition) {
std::string arrow_type = actor->memory_free_insert_position().second ? "data_arrow" : "control_arrow";
ofs << "\t\tmemory_free_insert_position:" << actor->memory_free_insert_position().first
<< "\tinsert_arrow_type:" << arrow_type << "\n";
}
ofs << "\n";
}

View File

@ -27,6 +27,20 @@
namespace mindspore {
namespace runtime {
namespace {
bool IsSomasEnable(const SomasInfo *somas_info) {
// Somas is currently closed in the runtime.
auto ms_context = MsContext::GetInstance();
MS_EXCEPTION_IF_NULL(ms_context);
if ((ms_context->get_param<int>(MS_CTX_MEMORY_OPTIMIZE_LEVEL) == kOptimizeO0) ||
(ms_context->get_param<int>(MS_CTX_MEMORY_OPTIMIZE_LEVEL) == kOptimizeO1)) {
return false;
}
return ((somas_info != nullptr) && (somas_info->whole_block_size_ != 0));
}
} // namespace
using distributed::collective::CollectiveManager;
using distributed::recovery::RecoveryContext;
@ -42,46 +56,18 @@ void KernelActor::Init() {
MS_EXCEPTION_IF_NULL(kernel_);
real_input_num_ = common::AnfAlgo::GetInputTensorNum(kernel_);
kernel_info_ = dynamic_cast<KernelInfo *>(kernel_->kernel_info());
MS_EXCEPTION_IF_NULL(kernel_info_);
is_dynamic_shape_ = common::AnfAlgo::IsDynamicShape(kernel_);
for (size_t i = 0; i < real_input_num_; ++i) {
const auto &input_device_tensor = AnfAlgo::GetPrevNodeMutableOutputAddr(kernel_, i, false);
MS_EXCEPTION_IF_NULL(input_device_tensor);
(void)real_input_data_infos_.emplace_back(
std::make_shared<InputDataInfo>(input_device_tensor->format(), input_device_tensor->host_shape(),
input_device_tensor->GetSize(), input_device_tensor->type_id()));
if (is_dynamic_shape_ && IsSomasEnable(somas_info_)) {
MS_LOG(EXCEPTION) << "Not support the somas for the dynamic shape: " << GetAID().Name();
}
// Init the device tensors and kernel launch info.
copy_input_device_tensors_.resize(real_input_num_);
input_device_tensors_.resize(real_input_num_);
for (auto &input_address : input_device_tensors_) {
(void)memory_free_list_.emplace_back(input_address);
(void)launch_info_.inputs_.emplace_back(std::make_shared<Address>());
}
MS_EXCEPTION_IF_NULL(kernel_info_);
for (auto &output_address : kernel_info_->output_address_list()) {
MS_EXCEPTION_IF_NULL(output_address);
(void)output_device_tensors_.emplace_back(output_address.get());
(void)memory_alloc_list_.emplace_back(output_address.get());
(void)memory_free_list_.emplace_back(output_address.get());
(void)launch_info_.outputs_.emplace_back(std::make_shared<Address>());
}
for (auto &external_reference_tensor : external_reference_tensors_) {
(void)memory_free_list_.emplace_back(external_reference_tensor);
}
// The size of workspace maybe changed in dynamic shape, so put workspace_address in the end of memory_alloc_list_ and
// memory_free_list_, for the operation of dynamic_shape condition in FetchWorkspaceDeviceTensor.
for (auto &workspace_address : kernel_info_->workspace_address_list()) {
MS_EXCEPTION_IF_NULL(workspace_address);
(void)workspace_device_tensors_.emplace_back(workspace_address.get());
(void)memory_alloc_list_.emplace_back(workspace_address.get());
(void)memory_free_list_.emplace_back(workspace_address.get());
(void)launch_info_.workspaces_.emplace_back(std::make_shared<Address>());
}
InitInputInfo();
InitOutputInfo();
InitWorkspaceInfo();
// Init the output data.
output_data_by_output_index_.resize(output_device_tensors_.size());
InitOutputData();
if (output_data_.size() != output_data_arrows_.size()) {
MS_LOG(EXCEPTION) << "The output data size is wrong: " << GetAID().Name();
@ -95,12 +81,88 @@ void KernelActor::Init() {
MS_LOG(EXCEPTION) << "The output index is out of range: " << GetAID().Name();
}
data->data_ = output_device_tensors_[IntToSize(data_arrow->from_output_index_)];
(void)output_data_by_output_index_[IntToSize(data_arrow->from_output_index_)].emplace_back(data);
++output_data_index;
}
}
void KernelActor::InitInputInfo() {
for (size_t i = 0; i < real_input_num_; ++i) {
const auto &input_device_tensor = AnfAlgo::GetPrevNodeMutableOutputAddr(kernel_, i, false);
MS_EXCEPTION_IF_NULL(input_device_tensor);
(void)real_input_data_infos_.emplace_back(
std::make_shared<InputDataInfo>(input_device_tensor->format(), input_device_tensor->host_shape(),
input_device_tensor->GetSize(), input_device_tensor->type_id()));
}
copy_input_device_tensors_.resize(real_input_num_);
input_device_tensors_.resize(real_input_num_);
for (auto &input_address : input_device_tensors_) {
(void)memory_free_list_.emplace_back(input_address);
(void)launch_info_.inputs_.emplace_back(std::make_shared<Address>());
}
}
void KernelActor::InitOutputInfo() {
MS_EXCEPTION_IF_NULL(kernel_info_);
const auto &output_addresses = kernel_info_->output_address_list();
const auto &somas_outputs = kernel_info_->somas_output_result();
bool output_need_somas = false;
for (size_t i = 0; i < output_addresses.size(); ++i) {
auto &output_address = output_addresses[i];
MS_EXCEPTION_IF_NULL(output_address);
(void)output_device_tensors_.emplace_back(output_address.get());
(void)launch_info_.outputs_.emplace_back(std::make_shared<Address>());
// The output taken over by soma does not need to allocate memory.
if (kernel_info_->IsTensorEnableSomas(somas_outputs, i)) {
MS_EXCEPTION_IF_CHECK_FAIL((somas_outputs[i].second >= output_address->GetSize()), "The somas size is wrong.");
UpdateRefCount(output_address.get(), true);
output_need_somas = true;
} else {
(void)memory_alloc_list_.emplace_back(output_address.get());
(void)memory_free_list_.emplace_back(output_address.get());
}
}
if (output_need_somas && (!IsSomasEnable(somas_info_))) {
MS_LOG(EXCEPTION) << "The somas is not enable for: " << GetAID().Name();
}
for (auto &external_reference_tensor : external_reference_tensors_) {
(void)memory_free_list_.emplace_back(external_reference_tensor);
}
}
void KernelActor::InitWorkspaceInfo() {
MS_EXCEPTION_IF_NULL(kernel_info_);
// The size of workspace maybe changed in dynamic shape, so put workspace_address in the end of memory_alloc_list_ and
// memory_free_list_, for the operation of dynamic_shape condition in FetchWorkspaceDeviceTensor.
const auto &workspace_addresses = kernel_info_->workspace_address_list();
const auto &somas_workspace = kernel_info_->somas_workspace_result();
bool workspace_need_somas = false;
for (size_t i = 0; i < workspace_addresses.size(); ++i) {
auto &workspace_address = workspace_addresses[i];
MS_EXCEPTION_IF_NULL(workspace_address);
(void)workspace_device_tensors_.emplace_back(workspace_address.get());
(void)launch_info_.workspaces_.emplace_back(std::make_shared<Address>());
// The workspace taken over by soma does not need to allocate memory.
if (kernel_info_->IsTensorEnableSomas(somas_workspace, i)) {
MS_EXCEPTION_IF_CHECK_FAIL((somas_workspace[i].second >= workspace_address->GetSize()),
"The somas size is wrong.");
UpdateRefCount(workspace_address.get(), true);
workspace_need_somas = true;
} else {
(void)memory_alloc_list_.emplace_back(workspace_address.get());
(void)memory_free_list_.emplace_back(workspace_address.get());
}
}
if (workspace_need_somas && (!IsSomasEnable(somas_info_))) {
MS_LOG(EXCEPTION) << "The somas is not enable for: " << GetAID().Name();
}
}
void KernelActor::Run(OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(context);
MS_EXCEPTION_IF_NULL(device_contexts_[0]);
@ -129,9 +191,12 @@ void KernelActor::FetchWorkspaceDeviceTensor() {
if (launch_info_.workspaces_.size() > workspace_sizes.size()) {
size_t size = launch_info_.workspaces_.size() - workspace_sizes.size();
(void)workspace_device_tensors_.erase(workspace_device_tensors_.end() - size, workspace_device_tensors_.end());
(void)launch_info_.workspaces_.erase(launch_info_.workspaces_.end() - size, launch_info_.workspaces_.end());
MS_EXCEPTION_IF_CHECK_FAIL((memory_alloc_list_.size() >= size), "The memory alloc list size is wrong.");
MS_EXCEPTION_IF_CHECK_FAIL((memory_free_list_.size() >= size), "The memory free list size is wrong.");
(void)memory_alloc_list_.erase(memory_alloc_list_.end() - size, memory_alloc_list_.end());
(void)memory_free_list_.erase(memory_free_list_.end() - size, memory_free_list_.end());
(void)launch_info_.workspaces_.erase(launch_info_.workspaces_.end() - size, launch_info_.workspaces_.end());
} else if (launch_info_.workspaces_.size() < workspace_sizes.size()) {
for (size_t i = launch_info_.workspaces_.size(); i < workspace_sizes.size(); ++i) {
auto device_address = device_contexts_[0]->device_res_manager_->CreateDeviceAddress(
@ -141,9 +206,9 @@ void KernelActor::FetchWorkspaceDeviceTensor() {
AnfAlgo::SetWorkspaceAddr(device_address, i, kernel_.get()); // set to kernel_info
MS_EXCEPTION_IF_NULL(device_address);
(void)workspace_device_tensors_.emplace_back(device_address.get());
(void)launch_info_.workspaces_.emplace_back(std::make_shared<Address>());
(void)memory_alloc_list_.emplace_back(device_address.get());
(void)memory_free_list_.emplace_back(device_address.get());
(void)launch_info_.workspaces_.emplace_back(std::make_shared<Address>());
}
}
// Set workspace address new size
@ -191,8 +256,47 @@ void FreeMemory(const std::vector<DeviceTensor *> &free_list, const DeviceContex
}
} // namespace
void KernelActor::SetSomasMemory(OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(context);
MS_EXCEPTION_IF_NULL(kernel_info_);
if (!IsSomasEnable(somas_info_)) {
return;
}
if (somas_info_->base_address_ == nullptr) {
std::string error_info = "The somas base address isn't allocated when running " + GetAID().Name();
SET_OPCONTEXT_FAIL_RET_WITH_ERROR(*context, error_info);
}
// Set the memory address for the output tensors which use the somas.
const auto &somas_outputs = kernel_info_->somas_output_result();
MS_EXCEPTION_IF_CHECK_FAIL((output_device_tensors_.size() >= somas_outputs.size()), "The output num is wrong.");
for (size_t i = 0; i < somas_outputs.size(); ++i) {
if (somas_outputs[i].second > 0) {
// In this scenario, the Init function can ensure that the pointer of the relevant operation is not nullptr.
// In order to perform performance, the pointer validity is not checked here.
output_device_tensors_[i]->set_ptr(AddressOffset(somas_info_->base_address_, somas_outputs[i].first));
}
}
// Set the memory address for the workspace tensors which use the somas.
const auto &somas_workspace = kernel_info_->somas_workspace_result();
MS_EXCEPTION_IF_CHECK_FAIL((workspace_device_tensors_.size() >= somas_workspace.size()), "The output num is wrong.");
for (size_t i = 0; i < somas_workspace.size(); ++i) {
if (somas_workspace[i].second > 0) {
// In this scenario, the Init function can ensure that the pointer of the relevant operation is not nullptr.
// In order to perform performance, the pointer validity is not checked here.
workspace_device_tensors_[i]->set_ptr(AddressOffset(somas_info_->base_address_, somas_workspace[i].first));
}
}
}
void KernelActor::SendMemoryAllocReq(OpContext<DeviceTensor> *const context) {
running_dependent_msg_num_ = 1;
// Set the memory address for the tensors which use the somas.
SetSomasMemory(context);
// Allocate the memory address for other tensors which don't use the somas.
if (strategy_ == GraphExecutionStrategy::kPipeline) {
if (ActorDispatcher::is_memory_allocation_sync()) {
ActorDispatcher::SendSync(memory_manager_aid_, &MemoryManagerActor::AllocateMemory, &memory_alloc_list_,
@ -361,13 +465,14 @@ void KernelActor::FetchInputDeviceTensor(OpContext<DeviceTensor> *const context)
if (data_iter != input_op_datas_.end()) {
for (auto &input_data : data_iter->second) {
MS_EXCEPTION_IF_NULL(input_data);
if (IntToSize(input_data->index_) >= input_device_tensors_.size()) {
size_t input_index = IntToSize(input_data->index_);
if (input_index >= input_device_tensors_.size()) {
SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), "The input index is out of range.");
}
if (input_device_tensors_[IntToSize(input_data->index_)] != input_data->data_) {
input_device_tensors_[IntToSize(input_data->index_)] = input_data->data_;
memory_free_list_[IntToSize(input_data->index_)] = input_data->data_;
if (input_device_tensors_[input_index] != input_data->data_) {
input_device_tensors_[input_index] = input_data->data_;
memory_free_list_[input_index] = input_data->data_;
}
CopyInputDeviceTensor(input_data, context);
}
@ -407,30 +512,36 @@ void KernelActor::FetchOutputDeviceTensor(OpContext<DeviceTensor> *const context
SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), error_info);
}
const auto &somas_outputs = kernel_info_->somas_output_result();
// Update the size of output device tensor.
for (size_t i = 0; i < output_addresses.size(); ++i) {
auto output_address = output_addresses[i].get();
MS_EXCEPTION_IF_NULL(output_address);
if (output_size_list[i] != output_address->GetSize()) {
// 1. The size of output address may be changed in dynamic shape scenario.
// 2. If the format of the DeviceAddress is different, then the size is originally different.
// Such as NCHW(1,1,1,3) and NC1HWC0(1,1,1,1,16). So we don't need to update the size.
// 3. For example, we need to call cudnnGetRNNTrainingReserveSize to get real output size in LstmGpuKernelMod!
if (AnfAlgo::GetOutputFormat(kernel_, i) == output_address->format()) {
output_address->SetSize(output_size_list[i]);
}
// The output device tensor can't be changed.
if (output_device_tensors_[i] != output_address) {
std::string error_info =
"The device tensor can't be changed of " + GetAID().Name() + " with output index " + std::to_string(i);
SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), error_info);
}
// When the tensor is the output of graph or in dynamic shape scenario, the output tensor may be changed.
if (output_device_tensors_[i] != output_address) {
output_device_tensors_[i] = output_address;
memory_alloc_list_[i] = output_address;
memory_free_list_[real_input_num_ + i] = output_address;
if (output_size_list[i] == output_address->GetSize()) {
continue;
}
// Update output data.
for (auto &output_data : output_data_by_output_index_[i]) {
MS_EXCEPTION_IF_NULL(output_data);
output_data->data_ = output_address;
}
// Somas doesn't support the variable size.
if (kernel_info_->IsTensorEnableSomas(somas_outputs, i) && (somas_outputs[i].second < output_size_list[i])) {
std::string error_info =
"Somas doesn't support variable size of " + GetAID().Name() + " with output index " + std::to_string(i) +
". Suggest to turn off memory optimization by setting the context memory_optimize_level' to 'O0' ";
SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), error_info);
}
// 1. The size of output address may be changed in dynamic shape scenario.
// 2. If the format of the DeviceAddress is different, then the size is originally different.
// Such as NCHW(1,1,1,3) and NC1HWC0(1,1,1,1,16). So we don't need to update the size.
// 3. For example, we need to call cudnnGetRNNTrainingReserveSize to get real output size in LstmGpuKernelMod!
if (AnfAlgo::GetOutputFormat(kernel_, i) == output_address->format()) {
output_address->SetSize(output_size_list[i]);
}
}
}
@ -474,8 +585,11 @@ bool KernelActor::LaunchKernel(OpContext<DeviceTensor> *const) {
}
MS_EXCEPTION_IF_NULL(device_contexts_[0]);
return device_contexts_[0]->kernel_executor_->LaunchKernel(kernel_, launch_info_.inputs_, launch_info_.workspaces_,
launch_info_.outputs_, kernel_info_->stream_id());
MS_LOG(DEBUG) << "Begin launch kernel of actor: " << GetAID().Name();
auto ret = device_contexts_[0]->kernel_executor_->LaunchKernel(
kernel_, launch_info_.inputs_, launch_info_.workspaces_, launch_info_.outputs_, kernel_info_->stream_id());
MS_LOG(DEBUG) << "End launch kernel of actor: " << GetAID().Name();
return ret;
}
void KernelActor::PostLaunchKernel(OpContext<DeviceTensor> *const context) {

View File

@ -136,6 +136,11 @@ class KernelActor : public DebugAwareActor {
friend class RpcNodeScheduler;
#endif
// Init the device tensors and kernel launch info.
void InitInputInfo();
void InitOutputInfo();
void InitWorkspaceInfo();
// Fetch the device tensor for launch.
void FetchInputDeviceTensor(OpContext<DeviceTensor> *const context);
void FetchOutputDeviceTensor(OpContext<DeviceTensor> *const context);
@ -152,6 +157,9 @@ class KernelActor : public DebugAwareActor {
// Back refresh the dynamic device tensor stores that have been triggered copy.
void RefreshDeviceTensorCopyStore(OpContext<DeviceTensor> *const context);
// Set the memory address for the tensors which use the somas.
void SetSomasMemory(OpContext<DeviceTensor> *const context);
// The real input number of kernel launch.
size_t real_input_num_;
@ -164,9 +172,6 @@ class KernelActor : public DebugAwareActor {
std::set<size_t> modifiable_ref_input_indexes_;
std::set<size_t> modifiable_ref_output_indexes_;
// Cache output data by output index to modify the output data effectively.
std::vector<std::vector<OpData<DeviceTensor> *>> output_data_by_output_index_;
// Whether skip the kernel launch.
bool is_launch_skipped_;