fix a+m dump probilistic failure in ci: 1. do not parallel to dump statistics and remove file lock. 2. when tensor size is small, do it in single thread

This commit is contained in:
TinaMengtingZhang 2022-03-08 17:25:08 -05:00
parent 8b7d7a1e1a
commit 646909d3f4
5 changed files with 115 additions and 97 deletions

View File

@ -766,45 +766,82 @@ void E2eDump::DumpTensorToFile(const std::string &dump_path, const debugger::dum
if (dump_tensor_vec.empty()) {
return;
}
auto default_num_workers = std::max<uint32_t>(1, std::thread::hardware_concurrency() / 4);
auto num_threads = std::min<uint32_t>(default_num_workers, dump_tensor_vec.size());
uint32_t task_size = dump_tensor_vec.size() / num_threads;
uint32_t remainder = dump_tensor_vec.size() % num_threads;
std::vector<std::thread> threads;
threads.reserve(num_threads);
MS_LOG(INFO) << "Number of threads used for A+M dump: " << num_threads;
for (size_t t = 0; t < threads.capacity(); t++) {
uint32_t start_idx = t * task_size;
uint32_t end_idx = start_idx + task_size - 1;
if (t == num_threads - 1) {
end_idx += remainder;
constexpr int kMaxTensorSize = 1048576;
if (offset <= kMaxTensorSize) {
// If the total tensor size is less than 1Mb, do it in single thread.
ConvertFormatForTensors(&dump_tensor_vec, 0, dump_tensor_vec.size() - 1);
} else {
auto default_num_workers = std::max<uint32_t>(1, std::thread::hardware_concurrency() / 4);
auto num_threads = std::min<uint32_t>(default_num_workers, dump_tensor_vec.size());
uint32_t task_size = dump_tensor_vec.size() / num_threads;
uint32_t remainder = dump_tensor_vec.size() % num_threads;
std::vector<std::thread> threads;
threads.reserve(num_threads);
MS_LOG(INFO) << "Number of threads used for A+M dump: " << num_threads;
for (size_t t = 0; t < threads.capacity(); t++) {
uint32_t start_idx = t * task_size;
uint32_t end_idx = start_idx + task_size - 1;
if (t == num_threads - 1) {
end_idx += remainder;
}
threads.emplace_back(std::thread(&E2eDump::ConvertFormatForTensors, &dump_tensor_vec, start_idx, end_idx));
}
for (auto &thd : threads) {
if (thd.joinable()) {
thd.join();
}
}
threads.emplace_back(std::thread(&E2eDump::ConvertFormatForTensors, std::ref(dump_tensor_vec), start_idx, end_idx));
}
for (size_t t = 0; t < threads.capacity(); t++) {
threads[t].join();
for (auto &dump_tensor_item : dump_tensor_vec) {
(void)DumpTensorStatsIfNeeded(dump_tensor_item);
}
}
void E2eDump::ConvertFormatForTensors(const std::vector<dump_data_t> &dump_tensor_vec, uint32_t start_idx,
uint32_t end_idx) {
void E2eDump::ConvertFormatForTensors(std::vector<dump_data_t> *dump_tensor_vec, uint32_t start_idx, uint32_t end_idx) {
for (uint32_t idx = start_idx; idx <= end_idx; idx++) {
auto succ = ConvertFormatForTensorAndDump(dump_tensor_vec[idx]);
auto &dump_data_obj = dump_tensor_vec->at(idx);
auto succ = ConvertFormatForOneTensor(&dump_data_obj);
if (!succ) {
MS_LOG(INFO) << "Failed to convert format for tensor " << dump_tensor_vec[idx].dump_file_path << "."
<< dump_tensor_vec[idx].in_out_str << "." << dump_tensor_vec[idx].slot;
MS_LOG(INFO) << "Failed to convert format for tensor " << dump_data_obj.dump_file_path << "."
<< dump_data_obj.in_out_str << "." << dump_data_obj.slot;
}
(void)DumpTensorDataIfNeeded(dump_data_obj);
}
}
/*
* Feature group: Dump.
* Target device group: Ascend.
* Runtime category: Old runtime, MindRT.
* Description: It serves for A+M dump. Save tensor into dump path as configured.
*/
bool E2eDump::DumpTensorDataIfNeeded(const dump_data_t &dump_tensor_info) {
if (!DumpJsonParser::GetInstance().IsTensorDump()) {
return true;
}
// dump_path: dump_dir/op_type.op_name.task_id.stream_id.timestamp
std::ostringstream dump_path_ss;
dump_path_ss << dump_tensor_info.dump_file_path << "." << dump_tensor_info.in_out_str << "." << dump_tensor_info.slot
<< "." << dump_tensor_info.format;
std::string dump_path_slot = dump_path_ss.str();
std::shared_ptr<tensor::Tensor> trans_buf = dump_tensor_info.trans_buf;
bool dump_succ = false;
if (trans_buf) {
dump_succ = DumpJsonParser::DumpToFile(dump_path_slot, trans_buf->data_c(), trans_buf->Size(),
dump_tensor_info.host_shape, dump_tensor_info.data_type);
} else {
dump_succ = DumpJsonParser::DumpToFile(dump_path_slot, dump_tensor_info.data_ptr, dump_tensor_info.data_size,
dump_tensor_info.host_shape, dump_tensor_info.data_type);
}
return dump_succ;
}
/*
* Feature group: Dump.
* Target device group: Ascend.
* Runtime category: Old runtime, MindRT.
* Description: It serves for A+M dump. Save statistic of the tensor data into dump path as configured.
*/
bool DumpTensorStatsIfNeeded(const dump_data_t &dump_tensor_info, char *data_ptr) {
bool E2eDump::DumpTensorStatsIfNeeded(const dump_data_t &dump_tensor_info) {
// dump_path: dump_dir/op_type.op_name.task_id.stream_id.timestamp
if (!DumpJsonParser::GetInstance().IsStatisticDump()) {
return true;
@ -834,10 +871,16 @@ bool DumpTensorStatsIfNeeded(const dump_data_t &dump_tensor_info, char *data_ptr
MS_LOG(ERROR) << "Data type of operator " << file_name << " is not supported by statistic dump";
return false;
}
std::shared_ptr<tensor::Tensor> trans_buf = dump_tensor_info.trans_buf;
if (trans_buf) {
data->SetByteSize(trans_buf->Size());
data->SetDataPtr(static_cast<char *>(trans_buf->data_c()));
} else {
data->SetByteSize(dump_tensor_info.data_size);
data->SetDataPtr(dump_tensor_info.data_ptr);
}
data->SetType(dump_tensor_info.data_type);
data->SetByteSize(dump_tensor_info.data_size);
data->SetShape(dump_tensor_info.host_shape);
data->SetDataPtr(data_ptr);
return stat_dump.DumpTensorStatsToFile(dump_path.substr(0, pos), data);
}
@ -845,22 +888,16 @@ bool DumpTensorStatsIfNeeded(const dump_data_t &dump_tensor_info, char *data_ptr
* Feature group: Dump.
* Target device group: Ascend.
* Runtime category: Old runtime, MindRT.
* Description: It serves for A+M dump. Parse each attributes in Dumpdata proto object from device format to mindspore
* supported format and save tensor data or statistic as configured.
* Description: It serves for A+M dump. Convert tensor from device format to host format if needed.
*/
bool E2eDump::ConvertFormatForTensorAndDump(const dump_data_t &dump_tensor_info) {
// dump_path: dump_dir/op_type.op_name.task_id.stream_id.timestamp
std::ostringstream dump_path_ss;
dump_path_ss << dump_tensor_info.dump_file_path << "." << dump_tensor_info.in_out_str << "." << dump_tensor_info.slot
<< ".";
std::string dump_path_slot = dump_path_ss.str();
bool E2eDump::ConvertFormatForOneTensor(dump_data_t *dump_tensor_info) {
bool trans_success = false;
auto trans_buf = std::vector<uint8_t>(dump_tensor_info.data_size);
auto trans_buf = std::make_shared<tensor::Tensor>(dump_tensor_info->data_type, dump_tensor_info->host_shape);
// convert format to host format. It can be either NCHW or ND (non 4-dimemsions).
const uint8_t kNumFourDim = 4;
std::string host_format;
std::string device_format = dump_tensor_info.format;
if (dump_tensor_info.host_shape.size() == kNumFourDim) {
std::string device_format = dump_tensor_info->format;
if (dump_tensor_info->host_shape.size() == kNumFourDim) {
host_format = kOpFormat_NCHW;
} else {
host_format = kOpFormat_ND;
@ -869,43 +906,28 @@ bool E2eDump::ConvertFormatForTensorAndDump(const dump_data_t &dump_tensor_info)
auto iter = kSuppTransFormatPair.find(std::make_pair(device_format, host_format));
if (iter == kSuppTransFormatPair.end()) {
MS_LOG(INFO) << "Do not support convert from format " << device_format << " to " << host_format << " for tensor "
<< dump_path_slot;
<< dump_tensor_info->dump_file_path << "." << dump_tensor_info->in_out_str << "."
<< dump_tensor_info->slot;
} else {
const trans::FormatArgs format_args{dump_tensor_info.data_ptr,
dump_tensor_info.data_size,
const trans::FormatArgs format_args{dump_tensor_info->data_ptr,
dump_tensor_info->data_size,
host_format,
device_format,
dump_tensor_info.host_shape,
dump_tensor_info.device_shape,
dump_tensor_info.data_type};
auto group = dump_tensor_info.sub_format > 1 ? dump_tensor_info.sub_format : 1;
trans_success = trans::TransFormatFromDeviceToHost(format_args, trans_buf.data(), group);
dump_tensor_info->host_shape,
dump_tensor_info->device_shape,
dump_tensor_info->data_type};
auto group = dump_tensor_info->sub_format > 1 ? dump_tensor_info->sub_format : 1;
trans_success = trans::TransFormatFromDeviceToHost(format_args, trans_buf->data_c(), group);
if (!trans_success) {
MS_LOG(ERROR) << "Trans format failed.";
}
}
}
// dump tensor data into npy file
bool dump_success = true;
if (trans_success) {
dump_success = DumpTensorStatsIfNeeded(dump_tensor_info, reinterpret_cast<char *>(trans_buf.data()));
if (DumpJsonParser::GetInstance().IsTensorDump()) {
dump_path_slot += host_format;
dump_success = DumpJsonParser::DumpToFile(dump_path_slot, trans_buf.data(), dump_tensor_info.data_size,
dump_tensor_info.host_shape, dump_tensor_info.data_type) &&
dump_success;
}
} else {
dump_success = DumpTensorStatsIfNeeded(dump_tensor_info, dump_tensor_info.data_ptr);
if (DumpJsonParser::GetInstance().IsTensorDump()) {
dump_path_slot += device_format;
dump_success = DumpJsonParser::DumpToFile(dump_path_slot, dump_tensor_info.data_ptr, dump_tensor_info.data_size,
dump_tensor_info.host_shape, dump_tensor_info.data_type) &&
dump_success;
}
dump_tensor_info->format = host_format;
dump_tensor_info->trans_buf = trans_buf;
}
return dump_success;
return trans_success;
}
uint64_t UnpackUint64Value(char *ptr) {

View File

@ -19,6 +19,7 @@
#include <dirent.h>
#include <map>
#include <memory>
#include <string>
#include <vector>
@ -46,6 +47,7 @@ struct dump_data_t {
int32_t sub_format;
std::string in_out_str;
uint32_t slot;
std::shared_ptr<tensor::Tensor> trans_buf{nullptr};
};
class E2eDump {
@ -116,10 +118,13 @@ class E2eDump {
#ifdef ENABLE_D
static nlohmann::json ParseOverflowInfo(char *data_ptr);
static bool ConvertFormatForTensorAndDump(const dump_data_t &dump_tensor_info);
static bool ConvertFormatForOneTensor(dump_data_t *dump_tensor_info);
static void ConvertFormatForTensors(const std::vector<dump_data_t> &dump_tensor_vec, uint32_t start_idx,
uint32_t end_idx);
static void ConvertFormatForTensors(std::vector<dump_data_t> *dump_tensor_vec, uint32_t start_idx, uint32_t end_idx);
static bool DumpTensorStatsIfNeeded(const dump_data_t &dump_tensor_info);
static bool DumpTensorDataIfNeeded(const dump_data_t &dump_tensor_info);
#endif
inline static unsigned int starting_graph_id = INT32_MAX;

View File

@ -52,35 +52,28 @@ bool CsvWriter::OpenFile(const std::string &path, const std::string &header) {
}
// try to open file
std::string file_path_value = file_path.value();
{
std::lock_guard<std::mutex> lock(dump_csv_lock_);
if (file_.is_open()) {
return true;
}
bool first_time_opening = file_path_str_ != path;
ChangeFileMode(file_path_value, S_IWUSR);
if (first_time_opening) {
// remove any possible output from previous runs
file_.open(file_path_value, std::ios::out | std::ios::trunc | std::ios::binary);
} else {
file_.open(file_path_value, std::ios::out | std::ios::app | std::ios::binary);
}
if (!file_.is_open()) {
MS_LOG(WARNING) << "Open file " << file_path_value << " failed." << ErrnoToString(errno);
return false;
}
if (first_time_opening) {
file_ << header;
(void)file_.flush();
file_path_str_ = path;
}
MS_LOG(INFO) << "Opened file: " << file_path_value;
bool first_time_opening = file_path_str_ != path;
ChangeFileMode(file_path_value, S_IWUSR);
if (first_time_opening) {
// remove any possible output from previous runs
file_.open(file_path_value, std::ios::out | std::ios::trunc | std::ios::binary);
} else {
file_.open(file_path_value, std::ios::out | std::ios::app | std::ios::binary);
}
if (!file_.is_open()) {
MS_LOG(WARNING) << "Open file " << file_path_value << " failed." << ErrnoToString(errno);
return false;
}
if (first_time_opening) {
file_ << header;
(void)file_.flush();
file_path_str_ = path;
}
MS_LOG(INFO) << "Opened file: " << file_path_value;
return true;
}
void CsvWriter::CloseFile() noexcept {
std::lock_guard<std::mutex> lock(dump_csv_lock_);
if (file_.is_open()) {
file_.close();
ChangeFileMode(file_path_str_, S_IRUSR);
@ -189,7 +182,6 @@ bool TensorStatDump::DumpTensorStatsToFile(const std::string &dump_path, const s
}
shape << ")\"";
CsvWriter &csv = CsvWriter::GetInstance();
csv.Lock();
csv.WriteToCsv(op_type_);
csv.WriteToCsv(op_name_);
csv.WriteToCsv(task_id_);
@ -216,7 +208,6 @@ bool TensorStatDump::DumpTensorStatsToFile(const std::string &dump_path, const s
csv.WriteToCsv(stat.neg_inf_count);
csv.WriteToCsv(stat.pos_inf_count);
csv.WriteToCsv(stat.zero_count, true);
csv.Unlock();
return true;
}
} // namespace mindspore

View File

@ -41,15 +41,12 @@ class CsvWriter {
void CloseFile() noexcept;
template <typename T>
void WriteToCsv(const T &val, bool end_line = false);
void Lock() { dump_csv_lock_.lock(); }
void Unlock() { dump_csv_lock_.unlock(); }
private:
const std::string kSeparator = ",";
const std::string kEndLine = "\n";
std::ofstream file_;
std::string file_path_str_ = "";
std::mutex dump_csv_lock_;
};
class TensorStatDump {

View File

@ -541,7 +541,8 @@ def test_stat_dump_nulls():
assert output['Max Value'] == 'null'
assert output['Avg Value'] == 'null'
@pytest.mark.level1
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
@ -573,7 +574,8 @@ def test_ascend_statistic_dump_kernel_by_kernel():
run_saved_data_dump_test('test_async_dump', 'statistic')
del os.environ['GRAPH_OP_RUN']
@pytest.mark.level1
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
@ -587,7 +589,8 @@ def test_ascend_tensor_dump():
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
run_saved_data_dump_test('test_async_dump', 'tensor')
@pytest.mark.level1
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard