support start profiler in the minddle of training.

This commit is contained in:
yuximiao 2021-11-30 10:42:00 +08:00
parent df0d5f645e
commit e99c0a48e6
16 changed files with 139 additions and 64 deletions

View File

@ -73,8 +73,7 @@ bool DatasetIteratorKernel::Init(const CNodePtr &kernel_node) {
#ifndef ENABLE_SECURITY
auto profiler_inst = profiler::gpu::GPUProfiler::GetInstance();
MS_EXCEPTION_IF_NULL(profiler_inst);
profiling_enable_ = profiler_inst->GetEnableFlag();
if (profiling_enable_) {
if (profiler_inst->IsInitialized()) {
std::string path = profiler_inst->ProfileDataPath();
profiling_op_ = std::make_shared<GetNextProfiling>(path);
MS_EXCEPTION_IF_NULL(profiling_op_);
@ -89,19 +88,27 @@ void DatasetIteratorKernel::InitSizeLists() { return; }
bool DatasetIteratorKernel::ReadDevice(void **addr, size_t *len) {
uint64_t start_time_stamp = 0;
uint32_t queue_size = 0;
#ifndef ENABLE_SECURITY
auto profiler_inst = profiler::gpu::GPUProfiler::GetInstance();
MS_EXCEPTION_IF_NULL(profiler_inst);
#endif
int repeat = 0;
while (true) {
#ifndef ENABLE_SECURITY
profiling_enable_ = profiler_inst->GetEnableFlag();
if (profiling_enable_) {
start_time_stamp = profiling_op_->GetTimeStamp();
queue_size = GpuBufferMgr::GetInstance().Size(handle_);
}
#endif
auto ret = GpuBufferMgr::GetInstance().Front(handle_, addr, len);
if (ret == device::SUCCESS) {
#ifndef ENABLE_SECURITY
if (profiling_enable_) {
uint64_t end_time_stamp = profiling_op_->GetTimeStamp();
profiling_op_->RecordData(queue_size, start_time_stamp, end_time_stamp);
}
#endif
break;
}
@ -117,11 +124,12 @@ bool DatasetIteratorKernel::ReadDevice(void **addr, size_t *len) {
MS_LOG(EXCEPTION) << "Get data timeout";
}
}
#ifndef ENABLE_SECURITY
if (profiling_enable_) {
uint64_t end_time_stamp = profiling_op_->GetTimeStamp();
profiling_op_->RecordData(queue_size, start_time_stamp, end_time_stamp);
}
#endif
MS_LOG(ERROR) << "Get data failed, errcode " << ret;
return false;
}

View File

@ -481,13 +481,14 @@ GraphId AscendSession::CompileGraphImpl(NotNull<FuncGraphPtr> func_graph) {
DumpAllGraphs(all_graphs);
// Save memory profiling data to proto file
#ifndef ENABLE_SECURITY
auto profiling_instance = MemoryProfiling::GetInstance();
if (profiling_instance.IsMemoryProfilingEnable()) {
if (MemoryProfiling::GetInstance().IsMemoryProfilingInitialized()) {
auto runtime_instance = device::KernelRuntimeManager::Instance().GetKernelRuntime(kAscendDevice, device_id_);
MS_EXCEPTION_IF_NULL(runtime_instance);
uint64_t mem_size = runtime_instance->GetAvailableMemMaxSize();
profiling_instance.SetDeviceMemSize(mem_size);
profiling_instance.SaveMemoryProfiling();
uint64_t mem_size = runtime_instance->GetMsUsedHbmSize();
MemoryProfiling::GetInstance().SetDeviceMemSize(mem_size);
if (MemoryProfiling::GetInstance().NeedSaveMemoryProfiling()) {
MemoryProfiling::GetInstance().SaveMemoryProfiling();
}
}
#endif
// return the root_graph id to backend

View File

@ -19,10 +19,12 @@
#include "pybind_api/api_register.h"
#include "utils/log_adapter.h"
#include "utils/utils.h"
#include "profiler/device/ascend/memory_profiling.h"
#include "runtime/device/ascend/profiling/profiling_manager.h"
#include <nlohmann/json.hpp>
using mindspore::device::ascend::ProfilingManager;
using mindspore::profiler::ascend::MemoryProfiling;
namespace mindspore {
namespace profiler {
@ -52,6 +54,8 @@ void AscendProfiler::InitProfiling(const std::string &profiling_path, uint32_t d
device_id_ = device_id;
(void)ProfilingManager::GetInstance().InitProfiling(profiling_path, device_id);
MemoryProfiling::GetInstance().SetMemoryProfilingInitialize(profiling_options_);
aclError aclRet = aclprofInit(profile_data_path_.c_str(), profile_data_path_.length());
if (aclRet != ACL_SUCCESS) {
MS_LOG(EXCEPTION) << "Failed to call aclprofInit function.";
@ -112,6 +116,8 @@ void AscendProfiler::Start() {
}
MS_LOG(INFO) << "Start profiling, options mask is " << mask << " aic_metrics is " << aic_metrics;
MemoryProfiling::GetInstance().StartMemoryProfiling();
StepProfilingEnable(true);
}
@ -132,6 +138,8 @@ void AscendProfiler::Stop() {
MS_LOG(EXCEPTION) << "Failed to call aclprofDestroyConfig function.";
}
MemoryProfiling::GetInstance().StopMemoryProfiling();
StepProfilingEnable(false);
}

View File

@ -29,29 +29,28 @@ namespace profiler {
namespace ascend {
constexpr char kOutputPath[] = "output";
bool MemoryProfiling::IsMemoryProfilingEnable() const {
auto ascend_profiler = AscendProfiler::GetInstance();
MS_EXCEPTION_IF_NULL(ascend_profiler);
if (!ascend_profiler->GetProfilingEnableFlag()) {
return false;
}
const std::string prof_options_str = ascend_profiler->GetProfilingOptions();
void MemoryProfiling::SetMemoryProfilingInitialize(const std::string &profiling_options) {
nlohmann::json options;
try {
options = nlohmann::json::parse(prof_options_str);
options = nlohmann::json::parse(profiling_options);
} catch (nlohmann::json::exception &e) {
MS_LOG(ERROR) << "Failed to parse profiling options.";
return false;
MS_LOG(EXCEPTION) << "Failed to parse profiling options because of format error.";
}
if (options["profile_memory"] == "off") {
return false;
if (options["profile_memory"] == "on") {
is_initialized_ = true;
}
return true;
}
void MemoryProfiling::StartMemoryProfiling() {
is_enabled_ = true;
if (NeedSaveMemoryProfiling()) {
SaveMemoryProfiling();
}
}
void MemoryProfiling::StopMemoryProfiling() { is_enabled_ = false; }
std::shared_ptr<GraphMemory> MemoryProfiling::AddGraphMemoryNode(uint32_t graph_id) {
std::shared_ptr<GraphMemory> node = std::make_shared<GraphMemory>(graph_id);
MS_EXCEPTION_IF_NULL(node);
@ -70,6 +69,11 @@ std::shared_ptr<GraphMemory> MemoryProfiling::GetGraphMemoryNode(uint32_t graph_
bool MemoryProfiling::MemoryToPB() {
memory_proto_.set_total_mem(device_mem_size_);
if (graph_memory_.size() == 0) {
MS_LOG(INFO) << "No memory profiling data need to be reported.";
return false;
}
for (const auto &graph : graph_memory_) {
GraphMemProto *graph_proto = memory_proto_.add_graph_mem();
if (graph_proto == nullptr) {
@ -127,15 +131,19 @@ void MemoryProfiling::SaveMemoryProfiling() {
if (device_id.empty()) {
device_id = "0";
}
if (!MemoryToPB()) {
return;
}
std::string file = dir_path + std::string("/memory_usage_") + std::string(device_id) + std::string(".pb");
MemoryToPB();
std::fstream handle(file, std::ios::out | std::ios::trunc | std::ios::binary);
if (!memory_proto_.SerializeToOstream(&handle)) {
MS_LOG(ERROR) << "Save memory profiling data to file failed";
}
handle.close();
has_save_memory_data_ = true;
MS_LOG(INFO) << "Start save memory profiling data to " << file << " end";
return;
}

View File

@ -99,7 +99,7 @@ class GraphMemory {
class MemoryProfiling {
public:
MemoryProfiling() : device_mem_size_(0) {}
MemoryProfiling() : device_mem_size_(0), is_initialized_(false), is_enabled_(false), has_save_memory_data_(false) {}
~MemoryProfiling() = default;
static MemoryProfiling &GetInstance() {
@ -107,17 +107,25 @@ class MemoryProfiling {
return instance;
}
bool IsMemoryProfilingEnable() const;
std::shared_ptr<GraphMemory> AddGraphMemoryNode(uint32_t graph_id);
std::shared_ptr<GraphMemory> GetGraphMemoryNode(uint32_t graph_id) const;
void SetDeviceMemSize(uint64_t size) { device_mem_size_ = size; }
bool MemoryToPB();
void SaveMemoryProfiling();
bool IsMemoryProfilingInitialized() const { return is_initialized_; }
bool IsMemoryProfilingEnabled() const { return is_enabled_; }
void SetMemoryProfilingInitialize(const std::string &profiling_options);
bool NeedSaveMemoryProfiling() { return (is_enabled_) && (!has_save_memory_data_); }
void StartMemoryProfiling();
void StopMemoryProfiling();
private:
MemoryProto memory_proto_;
std::map<uint32_t, std::shared_ptr<GraphMemory>> graph_memory_;
uint64_t device_mem_size_;
bool is_initialized_;
bool is_enabled_;
bool has_save_memory_data_;
};
} // namespace ascend
} // namespace profiler

View File

@ -415,6 +415,7 @@ void GPUProfiler::Init(const std::string &profileDataPath = "") {
profile_data_path_ = profileDataPath;
MS_LOG(INFO) << "GPU start time(ns):" << base_time_.gpu_start_time
<< " Host start time(ns):" << base_time_.host_start_time << " profile data path: " << profile_data_path_;
is_init_ = true;
}
void GPUProfiler::SetRunTimeData(const std::string &op_name, void *stream) {

View File

@ -133,6 +133,7 @@ class GPUProfiler : public Profiler {
void RegisterProfilingOp(std::shared_ptr<ProfilingOp> node);
void SetStepTraceOpName(ProfilingTraceInfo trace_op_name);
std::string ProfileDataPath() const { return profile_data_path_; }
bool IsInitialized() { return is_init_; }
private:
void SingleOpLaunchTimeProcess(float op_time_elapsed);
@ -175,6 +176,7 @@ class GPUProfiler : public Profiler {
std::string profile_data_path_;
std::map<std::string, std::shared_ptr<ProfilingOp>> profiling_op_;
ProfilingTraceInfo step_trace_op_name_;
bool is_init_ = false;
};
} // namespace gpu
} // namespace profiler

View File

@ -1184,6 +1184,12 @@ uint64_t AscendKernelRuntime::GetAvailableMemMaxSize() const {
return ascend_mem_manager->GetMsMaxMemSize();
}
uint64_t AscendKernelRuntime::GetMsUsedHbmSize() const {
auto ascend_mem_manager = std::dynamic_pointer_cast<AscendMemoryManager>(mem_manager_);
MS_EXCEPTION_IF_NULL(ascend_mem_manager);
return ascend_mem_manager->GetMsUsedHbmSize();
}
bool AscendKernelRuntime::DeleteDumpDir(const std::string &path) {
string real_path = GetRealPath(path);
if (DeleteDumpFile(real_path) == -1) {

View File

@ -73,6 +73,7 @@ class AscendKernelRuntime : public KernelRuntime {
void *GetModelStream(uint32_t graph_id) const override;
// add for MindRT
void ReleaseDeviceRes() override;
uint64_t GetMsUsedHbmSize() const;
protected:
DeviceAddressPtr CreateDeviceAddress(void *device_ptr, size_t device_size, const string &format,

View File

@ -44,7 +44,7 @@ class AscendMemAdapter {
uint64_t FreeDevMemSize() { return static_mem_offset_ - max_dynamic_mem_offset_; }
uint64_t MaxHbmSizeForMs() { return max_available_ms_hbm_size_; }
uint64_t GetMsUsedHbmSize() { return ms_used_hbm_size_; }
std::string DevMemStatistics();
std::string DevMemDetailInfo();

View File

@ -42,6 +42,8 @@ void AscendMemoryManager::ClearGlobalIdleMem() { AscendMemoryPool::GetInstance()
uint64_t AscendMemoryManager::GetMsMaxMemSize() { return AscendMemAdapter::GetInstance().MaxHbmSizeForMs(); }
uint64_t AscendMemoryManager::GetMsUsedHbmSize() { return AscendMemAdapter::GetInstance().GetMsUsedHbmSize(); }
void *AscendMemoryManager::MallocDevice(size_t size) {
auto align_size = GetCommonAlignSize(size);
return AscendMemoryPool::GetInstance().AllocTensorMem(align_size);
@ -71,7 +73,7 @@ uint8_t *AscendMemoryManager::MallocStaticMem(size_t size, bool communication_me
MS_LOG(INFO) << "Malloc Memory for Static: size[" << align_size << "] communication_mem:" << communication_mem;
#ifndef ENABLE_SECURITY
if (MemoryProfiling::GetInstance().IsMemoryProfilingEnable() && graph_id != kInvalidGraphId) {
if (MemoryProfiling::GetInstance().IsMemoryProfilingInitialized() && graph_id != kInvalidGraphId) {
auto node = MemoryProfiling::GetInstance().GetGraphMemoryNode(graph_id);
if (node == nullptr) {
node = MemoryProfiling::GetInstance().AddGraphMemoryNode(graph_id);
@ -109,7 +111,7 @@ uint8_t *AscendMemoryManager::MallocDynamicMem(size_t size, bool communication_m
void AscendMemoryManager::MallocSomasDynamicMem(const session::KernelGraph &graph) {
MemoryManager::MallocSomasDynamicMem(graph);
#ifndef ENABLE_SECURITY
if (MemoryProfiling::GetInstance().IsMemoryProfilingEnable()) {
if (MemoryProfiling::GetInstance().IsMemoryProfilingInitialized()) {
MS_EXCEPTION_IF_NULL(somas_reuse_util_ptr_);
somas_reuse_util_ptr_->ConvertToProfilingNode(graph.graph_id());
}

View File

@ -45,6 +45,7 @@ class AscendMemoryManager : public MemoryManager {
void SwapIn(const void *host_ptr, void *device_ptr, size_t mem_size, void *stream) override;
void SwapOut(const void *device_ptr, void *host_ptr, size_t mem_size, void *stream) override;
size_t GetAvailableMemSize() override;
uint64_t GetMsUsedHbmSize();
protected:
uint8_t *MallocStaticMem(size_t size, bool communication_mem, uint32_t graph_id = kInvalidGraphId) override;

View File

@ -109,6 +109,7 @@ class KernelRuntime {
virtual void PreInit() {}
#endif
virtual uint64_t GetAvailableMemMaxSize() const { return 0; }
virtual uint64_t GetMsUsedHbmSize() const { return 0; }
virtual void GenKernelEvents(const session::KernelGraph &graph);
virtual std::shared_ptr<DeviceEvent> CreateDeviceEvent() { return nullptr; }
virtual std::shared_ptr<DeviceEvent> CreateDeviceTimeEvent() { return nullptr; }

View File

@ -419,11 +419,12 @@ void AscendDeviceContext::AllocateGraphMemory(const NotNull<KernelGraphPtr> &roo
runtime_instance_->UpdateRefNodeOutputMem(*root_graph.get());
#ifndef ENABLE_SECURITY
auto profiling_instance = MemoryProfiling::GetInstance();
if (profiling_instance.IsMemoryProfilingEnable()) {
uint64_t mem_size = runtime_instance_->GetAvailableMemMaxSize();
profiling_instance.SetDeviceMemSize(mem_size);
profiling_instance.SaveMemoryProfiling();
if (MemoryProfiling::GetInstance().IsMemoryProfilingInitialized()) {
uint64_t mem_size = runtime_instance_->GetMsUsedHbmSize();
MemoryProfiling::GetInstance().SetDeviceMemSize(mem_size);
if (MemoryProfiling::GetInstance().NeedSaveMemoryProfiling()) {
MemoryProfiling::GetInstance().SaveMemoryProfiling();
}
}
#endif
}

View File

@ -264,11 +264,13 @@ class MinddataPipelineParser:
queue_size = output_queue.get('size')
if queue_size is None:
raise ValueError("The queue can not be None.")
queue_average_size = sum(queue_size) / len(queue_size)
if queue_size:
queue_average_size = sum(queue_size) / len(queue_size)
queue_length = output_queue.get('length')
if queue_length == 0:
raise ValueError("The length of queue can not be 0.")
queue_usage_rate = queue_average_size / queue_length
if queue_average_size is not None:
queue_usage_rate = queue_average_size / queue_length
children_id = op_node.get('children')
op_info = [

View File

@ -146,25 +146,29 @@ class Profiler:
# Setup and start MindData Profiling
self._md_profiler = cde.GlobalContext.profiling_manager()
self._md_profiler.init()
self._md_profiler.start()
if self._device_target:
cpu_profiler = c_expression.CPUProfiler
self._cpu_profiler = cpu_profiler.get_instance()
self._cpu_profiler.init(self._output_path)
self._cpu_profiler.step_profiling_enable(True)
if self._device_target and self._device_target == "GPU":
gpu_profiler = c_expression.GPUProfiler
self._gpu_profiler = gpu_profiler.get_instance()
self._gpu_profiler.init(self._output_path)
self._gpu_profiler.step_profiling_enable(True)
if GlobalComm.WORLD_COMM_GROUP == "nccl_world_group":
self._dev_id = str(get_rank())
os.environ['DEVICE_ID'] = self._dev_id
self.start_profile = kwargs.pop("start_profile", True)
if not isinstance(self.start_profile, bool):
raise TypeError("The parameter start_profile must be bool.")
if kwargs:
logger.warning("Params not be supported yet on GPU.")
elif self._device_target and self._device_target == "Ascend":
self._init_time = int(time.time() * 10000000)
logger.info("Profiling: profiling init time: %d", self._init_time)
self._parse_parameter_for_ascend(**kwargs)
os.environ['DEVICE_ID'] = self._dev_id
@ -186,10 +190,9 @@ class Profiler:
# add job id env through user input later
self._job_id_env = 0
self._init_time = int(time.time() * 10000000)
logger.info("Profiling: profiling init time: %d", self._init_time)
if self.start_profile:
self.start()
if self.start_profile:
self.start()
def _construct_profiling_options(self):
"""
@ -238,6 +241,9 @@ class Profiler:
if self._profile_communication:
hccl_option = {"output": self._output_path, "task_trace": "on"}
os.environ['PROFILING_OPTIONS'] = json.dumps(hccl_option)
if not self.start_profile:
raise TypeError("The parameter profile_communication can not be True if want to start profiler in the "
"process of training.")
self._profile_memory = kwargs.pop("profile_memory", False)
if not isinstance(self._profile_memory, bool):
raise TypeError("The parameter profile_memory must be bool")
@ -255,10 +261,11 @@ class Profiler:
msg = "Do not analyze twice in the profiler."
raise RuntimeError(msg)
Profiler._has_analysed = True
_environment_check()
self._cpu_profiler.stop()
self._md_profiler.stop()
self._md_profiler.save(self._output_path)
if self._device_target and self._device_target == "GPU":
self._gpu_analyse()
@ -276,11 +283,12 @@ class Profiler:
self._rank_size = get_group_size()
release()
if self._has_started:
self._ascend_profiler.stop()
self.stop()
else:
msg = "The profiler has not start, so can not stop."
logger.info(msg)
logger.info("No need to stop profiler because profiler has been stopped.")
self._ascend_profiler.finalize()
job_id = self._get_profiling_job_id()
@ -392,34 +400,50 @@ class Profiler:
def start(self):
"""Used for Ascend, start profiling."""
self._start_time = int(time.time() * 10000000)
logger.info("Profiling: start time: %d", self._start_time)
if not self._has_started:
self._has_started = True
else:
msg = "The profiler has already started."
logger.error(msg)
raise RuntimeError(msg)
self._ascend_profiler.start()
self._start_time = int(time.time() * 10000000)
logger.info("Profiling: start time: %d", self._start_time)
raise RuntimeError("The profiler has already started.")
self._md_profiler.start()
self._cpu_profiler.step_profiling_enable(True)
if self._device_target and self._device_target == "GPU":
self._gpu_profiler.step_profiling_enable(True)
elif self._device_target and self._device_target == "Ascend":
self._ascend_profiler.start()
def stop(self):
"""Used for Ascend, stop profiling."""
if self._has_started:
self._has_started = False
else:
msg = "The profiler has not start, so can not stop."
logger.error(msg)
raise RuntimeError(msg)
self._ascend_profiler.stop()
self._stop_time = int(time.time() * 10000000)
logger.info("Profiling: stop time: %d", self._stop_time)
raise RuntimeError("The profiler has not start, so can not stop.")
self._md_profiler.stop()
self._md_profiler.save(self._output_path)
if self._device_target and self._device_target == "GPU":
self._gpu_profiler.stop()
elif self._device_target and self._device_target == "Ascend":
self._ascend_profiler.stop()
self._stop_time = int(time.time() * 10000000)
logger.info("Profiling: stop time: %d", self._stop_time)
def _gpu_analyse(self):
"""Collect and analyse gpu performance data"""
self._dev_id = context.get_context("device_id")
if GlobalComm.WORLD_COMM_GROUP == "nccl_world_group":
self._dev_id = str(get_rank())
self._gpu_profiler.stop()
if self._has_started:
self.stop()
else:
logger.info("No need to stop profiler because profiler has been stopped.")
timeline_generator = self._generate_timeline()
# parse minddata pipeline operator and queue for GPU
@ -609,6 +633,7 @@ class Profiler:
logger.warning("Find profiling job path %s, but start_time(%d) is earlier than this training "
"start_time(%d), profiler will ignore this job dir.",
job_dir, int(job_start_time), self._start_time)
continue
job_id = dir_name
break