From 7f538b51e7f1379b2ea4964108e8389d46bc1d02 Mon Sep 17 00:00:00 2001 From: gongxiaoqing Date: Sat, 6 Feb 2021 14:42:14 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9B=9E=E9=80=80=20'Pull=20Request=20!11074?= =?UTF-8?q?=20:=20replace=20tdt=20with=20acltdt=20interface'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mindspore/ccsrc/CMakeLists.txt | 4 +- .../ccsrc/minddata/dataset/CMakeLists.txt | 2 +- .../ccsrc/minddata/dataset/api/datasets.cc | 8 +- .../dataset/include/datasets_bindings.cc | 7 +- .../engine/datasetops/device_queue_op.cc | 10 +- .../engine/ir/datasetops/transfer_node.cc | 14 +- .../engine/ir/datasetops/transfer_node.h | 4 +- .../dataset/engine/tdt/CMakeLists.txt | 9 +- .../minddata/dataset/engine/tdt/tdt_handle.cc | 39 --- .../minddata/dataset/engine/tdt/tdt_handle.h | 38 --- .../minddata/dataset/engine/tdt/tdt_plugin.cc | 174 ++++------- .../minddata/dataset/engine/tdt/tdt_plugin.h | 25 +- .../ccsrc/minddata/dataset/include/datasets.h | 6 +- mindspore/ccsrc/minddata/dataset/util/task.cc | 10 +- mindspore/ccsrc/runtime/device/CMakeLists.txt | 6 +- .../device/ascend/ascend_kernel_runtime.cc | 11 +- .../ccsrc/utils/context/context_extends.cc | 55 ++-- .../ccsrc/utils/context/context_extends.h | 6 +- mindspore/ccsrc/utils/tensorprint_utils.cc | 269 +++++++++--------- mindspore/ccsrc/utils/tensorprint_utils.h | 7 +- mindspore/core/utils/ms_context.cc | 19 -- mindspore/core/utils/ms_context.h | 14 +- mindspore/dataset/engine/datasets.py | 5 +- .../test_tensor_print/tensor_print_utils.py | 25 +- 24 files changed, 283 insertions(+), 484 deletions(-) delete mode 100644 mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_handle.cc delete mode 100644 mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_handle.h diff --git a/mindspore/ccsrc/CMakeLists.txt b/mindspore/ccsrc/CMakeLists.txt index c26e244d180..a45720dcf7d 100644 --- a/mindspore/ccsrc/CMakeLists.txt +++ b/mindspore/ccsrc/CMakeLists.txt @@ -267,8 +267,6 @@ if(ENABLE_D) find_library(REGISTER register ${ASCEND_RUNTIME_PATH} ${ASCEND_TOOLKIT_RUNTIME_PATH}) find_library(PLATFORM platform ${ASCEND_RUNTIME_PATH} ${ASCEND_TOOLKIT_RUNTIME_PATH}) find_library(OPTILING optiling ${ASCEND_OPP_PATH} ${ASCEND_TOOLKIT_OPP_PATH}) - find_library(ACL ascendcl ${ASCEND_RUNTIME_PATH} ${ASCEND_TOOLKIT_RUNTIME_PATH}) - # hccl_adpter find_library(HCCL_ADPTER hcom_graph_adaptor ${ASCEND_RUNTIME_PATH} ${ASCEND_TOOLKIT_RUNTIME_PATH}) find_library(HCCL_RA ra ${ASCEND_RUNTIME_PATH} ${ASCEND_TOOLKIT_RUNTIME_PATH}) @@ -283,7 +281,7 @@ if(ENABLE_D) mindspore::protobuf -Wl,--end-group) target_link_libraries(mindspore ge_runtime ${CCE_LIB} ${RUNTIME_LIB} ${TSDCLIENT} ${HCCL} ${DATATRANSFER} ${HCCL_ADPTER} ${REGISTER} -Wl,--no-as-needed ${OPTILING} ${HCCL_BUILDER} - ${HCCL_RA} ${PLATFORM} ${ACL}) + ${HCCL_RA} ${PLATFORM}) target_link_libraries(mindspore -Wl,--start-group proto_input mindspore::protobuf -Wl,--end-group) elseif(CMAKE_SYSTEM_NAME MATCHES "Windows") target_link_libraries(mindspore -Wl,--start-group proto_input mindspore::protobuf mindspore::sentencepiece diff --git a/mindspore/ccsrc/minddata/dataset/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/CMakeLists.txt index 28678026c68..ecacf5e4ff6 100644 --- a/mindspore/ccsrc/minddata/dataset/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/CMakeLists.txt @@ -264,7 +264,7 @@ if(ENABLE_GPUQUE) endif() if(ENABLE_TDTQUE) - target_link_libraries(_c_dataengine PRIVATE ${ACL}) + target_link_libraries(_c_dataengine PRIVATE ${TSDCLIENT}) endif() add_dependencies(_c_dataengine _c_mindrecord) diff --git a/mindspore/ccsrc/minddata/dataset/api/datasets.cc b/mindspore/ccsrc/minddata/dataset/api/datasets.cc index 8a79a2dcce8..4d9d52e9de9 100644 --- a/mindspore/ccsrc/minddata/dataset/api/datasets.cc +++ b/mindspore/ccsrc/minddata/dataset/api/datasets.cc @@ -131,8 +131,8 @@ std::shared_ptr Dataset::CreateIterator(std::vector colum #ifndef ENABLE_ANDROID // Function to return a transferred Node that transfers data through a device. -bool Dataset::DeviceQueue(std::string queue_name, std::string device_type, int32_t device_id, int32_t num_epochs, - bool send_epoch_end, int32_t total_batches, bool create_data_info_queue) { +bool Dataset::DeviceQueue(std::string queue_name, std::string device_type, int32_t num_epochs, bool send_epoch_end, + int32_t total_batches, bool create_data_info_queue) { Status rc; // Build and launch tree @@ -144,8 +144,8 @@ bool Dataset::DeviceQueue(std::string queue_name, std::string device_type, int32 } // Add TransferNode IR on top of dataset - auto ds = std::make_shared(shared_from_this()->IRNode(), queue_name, device_type, device_id, - send_epoch_end, total_batches, create_data_info_queue); + auto ds = std::make_shared(shared_from_this()->IRNode(), queue_name, device_type, send_epoch_end, + total_batches, create_data_info_queue); // Get ToDevice consumer auto consumer = std::make_unique(num_epochs); diff --git a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/include/datasets_bindings.cc b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/include/datasets_bindings.cc index 24fe3df9936..eb8ba07c09e 100644 --- a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/include/datasets_bindings.cc +++ b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/include/datasets_bindings.cc @@ -521,10 +521,9 @@ PYBIND_REGISTER(TransferNode, 2, ([](const py::module *m) { (void)py::class_>(*m, "TransferNode", "to create a TransferNode") .def(py::init([](std::shared_ptr self, std::string queue_name, std::string device_type, - int32_t device_id, bool send_epoch_end, int32_t total_batch, - bool create_data_info_queue) { - auto transfer = std::make_shared( - self, queue_name, device_type, device_id, send_epoch_end, total_batch, create_data_info_queue); + bool send_epoch_end, int32_t total_batch, bool create_data_info_queue) { + auto transfer = std::make_shared(self, queue_name, device_type, send_epoch_end, + total_batch, create_data_info_queue); THROW_IF_ERROR(transfer->ValidateParams()); return transfer; })); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc index 378f5730a0a..624aa06f907 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc @@ -55,7 +55,6 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i #endif #ifdef ENABLE_TDTQUE ascend_keep_waiting_ = true; - tdtInstancePtr = std::make_shared(channel_name_, device_id_); #endif } @@ -153,7 +152,7 @@ Status DeviceQueueOp::SendDataToAscend() { RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &currRow)); WaitContinueSignal(); auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost); - if (status != Status::OK()) { + if (status == TdtStatus::FAILED) { if (stop_send_) { MS_LOG(INFO) << "stop_send received"; return Status::OK(); @@ -184,9 +183,9 @@ Status DeviceQueueOp::SendDataToAscend() { } if (current_buffer->eoe() && send_epoch_end_) { TensorRow currRow; - auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost, - ACL_TENSOR_DATA_END_OF_SEQUENCE); - if (status != Status::OK()) { + auto status = + tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost, tdt::TDT_END_OF_SEQUENCE); + if (status == TdtStatus::FAILED) { if (stop_send_) { MS_LOG(INFO) << "stop_send received"; return Status::OK(); @@ -203,6 +202,7 @@ Status DeviceQueueOp::SendDataToAscend() { } RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); } + tree_->SetFinished(); return Status::OK(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/transfer_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/transfer_node.cc index 1ba16748921..c85f555e0df 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/transfer_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/transfer_node.cc @@ -32,20 +32,20 @@ namespace dataset { // Constructor for TransferNode TransferNode::TransferNode(std::shared_ptr child, std::string queue_name, std::string device_type, - int32_t device_id, bool send_epoch_end, int32_t total_batch, bool create_data_info_queue) + bool send_epoch_end, int32_t total_batch, bool create_data_info_queue) : prefetch_size_(16), queue_name_(std::move(queue_name)), device_type_(std::move(device_type)), send_epoch_end_(send_epoch_end), total_batch_(total_batch), create_data_info_queue_(create_data_info_queue), - device_id_(device_id) { + device_id_(0) { this->AddChild(child); } std::shared_ptr TransferNode::Copy() { - auto node = std::make_shared(nullptr, queue_name_, device_type_, device_id_, send_epoch_end_, - total_batch_, create_data_info_queue_); + auto node = std::make_shared(nullptr, queue_name_, device_type_, send_epoch_end_, total_batch_, + create_data_info_queue_); return node; } @@ -96,9 +96,9 @@ Status TransferNode::Build(std::vector> *const node_o RETURN_STATUS_UNEXPECTED(err_msg); } -// // Get device ID (shard ID) from children -// device_id_ = 0; -// RETURN_IF_NOT_OK(this->GetShardId(&device_id_)); + // Get device ID (shard ID) from children + device_id_ = 0; + RETURN_IF_NOT_OK(this->GetShardId(&device_id_)); auto op = std::make_shared(queue_name_, type, device_id_, prefetch_size_, send_epoch_end_, total_batch_, create_data_info_queue_); diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/transfer_node.h b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/transfer_node.h index b136ea71bfa..9d5617f2a5b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/transfer_node.h +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/transfer_node.h @@ -29,8 +29,8 @@ namespace dataset { class TransferNode : public DatasetNode { public: /// \brief Constructor - TransferNode(std::shared_ptr child, std::string queue_name, std::string device_type, int32_t device_id, - bool send_epoch_end, int32_t total_batch, bool create_data_info_queue); + TransferNode(std::shared_ptr child, std::string queue_name, std::string device_type, bool send_epoch_end, + int32_t total_batch, bool create_data_info_queue); /// \brief Destructor ~TransferNode() = default; diff --git a/mindspore/ccsrc/minddata/dataset/engine/tdt/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/engine/tdt/CMakeLists.txt index 590f6db490b..50165bb147a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/tdt/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/engine/tdt/CMakeLists.txt @@ -1,6 +1,5 @@ -file( - GLOB_RECURSE _CURRENT_SRC_FILES - RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} - "*.cc") +file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc") set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD) -add_library(engine-tdt OBJECT tdt_plugin.cc tdt_handle.cc) \ No newline at end of file +add_library(engine-tdt OBJECT + tdt_plugin.cc + ) diff --git a/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_handle.cc b/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_handle.cc deleted file mode 100644 index 21f250073d7..00000000000 --- a/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_handle.cc +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Copyright 2021 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "minddata/dataset/engine/tdt/tdt_handle.h" -namespace mindspore { -namespace dataset { - -std::vector TdtHandle::acl_handle = std::vector(); - -void TdtHandle::AddHandle(acltdtChannelHandle *handle) { - if (handle != nullptr) { - acl_handle.emplace_back(handle); - } -} - -bool TdtHandle::DestroyHandle() { - for (auto handle : acl_handle) { - if (handle != nullptr) { - if (acltdtDestroyChannel(handle) != ACL_SUCCESS) { - return false; - } - } - } - return true; -} -} // namespace dataset -} // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_handle.h b/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_handle.h deleted file mode 100644 index 3c0cfdf839c..00000000000 --- a/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_handle.h +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright 2021 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TDT_TDT_HANDLE_H_ -#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TDT_TDT_HANDLE_H_ - -#include -#include -#include "acl/acl_tdt.h" - -namespace mindspore { -namespace dataset { -class TdtHandle { - public: - static void AddHandle(acltdtChannelHandle *handle); - - static bool DestroyHandle(); - - private: - TdtHandle() {} - - static std::vector acl_handle; -}; -} // namespace dataset -} // namespace mindspore -#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TDT_TDT_HANDLE_H_ diff --git a/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.cc b/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.cc index 79ce95465c9..9bfdcacee45 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.cc @@ -23,138 +23,108 @@ namespace mindspore { namespace dataset { -TdtPlugin::TdtPlugin(const std::string &channel_name, int32_t device_id) { - // create acl tdt handle - acl_handle_ = acltdtCreateChannel(device_id, channel_name.c_str()); - if (acl_handle_ == nullptr) { - MS_LOG(ERROR) << "Failed to create channel for tdt queue."; +static std::shared_ptr instance_ptr_ = nullptr; + +std::shared_ptr TdtPlugin::GetInstance() { + if (instance_ptr_ == nullptr) { + instance_ptr_ = std::shared_ptr(new TdtPlugin); } - TdtHandle::AddHandle(acl_handle_); + return instance_ptr_; } -TdtPlugin::~TdtPlugin() { - if (acl_handle_ != nullptr && acltdtDestroyChannel(acl_handle_) != ACL_SUCCESS) { - MS_LOG(ERROR) << "Failed to destroy channel for tdt queue."; - } -} - -Status TdtPlugin::hostPush(TensorRow ts_row, bool is_wait, std::string channel_name, bool profiling, int32_t &time, - acltdtTensorType tdt_type) { +TdtStatus TdtPlugin::hostPush(TensorRow ts_row, bool is_wait, std::string channel_name, bool profiling, int32_t &time, + tdt::TdtDataType tdt_type) { MS_LOG(DEBUG) << "TDT channel name is " << channel_name << "."; - - acltdtDataset *acl_dataset = nullptr; + std::vector items; double start_time; - auto ret = translate(tdt_type, ts_row, &acl_dataset); - if (ret != Status::OK()) { - DestroyAclDataset(acl_dataset); - RETURN_STATUS_UNEXPECTED("TDT converting tensor failed!"); + if (tdt_type == tdt::TDT_TENSOR) { + auto ret = translate(ts_row, items); + if (ret != SUCCESS) { + MS_LOG(ERROR) << "TDT converting tensor failed!"; + return FAILED; + } + } else if (tdt_type == tdt::TDT_END_OF_SEQUENCE) { + DataItem data_item; + data_item.dataType_ = tdt::TDT_END_OF_SEQUENCE; + items.emplace_back(data_item); + MS_LOG(INFO) << "TDT data type is TDT_END_OF_SEQUENCE"; } - if (profiling) { start_time = ProfilingTime::GetCurMilliSecond(); } #if ENABLE_D // Data prefetch only when PS mode enables cache. - if (acltdtGetDatasetSize(acl_dataset) > 0) { - acltdtDataItem *item0 = acltdtGetDataItem(acl_dataset, 0); - if (!ps::PsDataPrefetch::GetInstance().PrefetchData(channel_name, acltdtGetDataAddrFromItem(item0), - acltdtGetDataSizeFromItem(item0))) { - RETURN_STATUS_UNEXPECTED("PrefetchData failed in when pre-processing sending data."); + if (items.size() > 0) { + if (!ps::PsDataPrefetch::GetInstance().PrefetchData(channel_name, items[0].dataPtr_.get(), items[0].dataLen_)) { + return FAILED; } } #endif - auto status = acltdtSendTensor(acl_handle_, acl_dataset, -1); - DestroyAclDataset(acl_dataset); - if (status != ACL_SUCCESS) { - RETURN_STATUS_UNEXPECTED("Tdt Send data failed."); + if (tdt::TdtHostPushData(channel_name, items) != 0) { + return FAILED; } if (profiling) { double end_time = ProfilingTime::GetCurMilliSecond(); time = (int32_t)(end_time - start_time); } - return Status::OK(); + return SUCCESS; } -Status TdtPlugin::getTdtType(DataType d_type, aclDataType &datatype) { +TdtStatus TdtPlugin::getTdtType(DataType d_type, std::string &datatype) { switch (d_type.value()) { case DataType::DE_BOOL: - datatype = ACL_BOOL; + datatype = "bool"; break; case DataType::DE_INT8: - datatype = ACL_INT8; + datatype = "int8"; break; case DataType::DE_UINT8: - datatype = ACL_UINT8; + datatype = "uint8"; break; case DataType::DE_INT16: - datatype = ACL_INT16; + datatype = "int16"; break; case DataType::DE_UINT16: - datatype = ACL_UINT16; + datatype = "uint16"; break; case DataType::DE_INT32: - datatype = ACL_INT32; + datatype = "int32"; break; case DataType::DE_UINT32: - datatype = ACL_UINT32; + datatype = "uint32"; break; case DataType::DE_FLOAT16: - datatype = ACL_FLOAT16; + datatype = "float16"; break; case DataType::DE_FLOAT32: - datatype = ACL_FLOAT; + datatype = "float32"; break; case DataType::DE_FLOAT64: - datatype = ACL_DOUBLE; + datatype = "float64"; break; case DataType::DE_INT64: - datatype = ACL_INT64; + datatype = "int64"; break; case DataType::DE_UINT64: - datatype = ACL_UINT64; + datatype = "uint64"; break; default: - RETURN_STATUS_UNEXPECTED("Invalid data, got unexpected data type."); + return FAILED; } - return Status::OK(); + return SUCCESS; } -Status TdtPlugin::translate(acltdtTensorType tdt_type, const TensorRow &ts_row, acltdtDataset **output_acl_dataset) { - auto acl_dataset = acltdtCreateDataset(); - if (acl_dataset == nullptr) { - RETURN_STATUS_UNEXPECTED("Create tdt dataset failed."); +TdtStatus TdtPlugin::translate(const TensorRow &ts_row, std::vector &items) { + if (ts_row.size() == 0) { + MS_LOG(ERROR) << "TDT the size of row is zero."; + return SUCCESS; } - auto status = AssembleTensor2AclDataset(tdt_type, ts_row, acl_dataset); - if (status != Status::OK()) { - DestroyAclDataset(acl_dataset); - RETURN_STATUS_UNEXPECTED("Assemble tensor row to tdt dataset failed."); - } - - *output_acl_dataset = acl_dataset; - return Status::OK(); -} - -Status TdtPlugin::AssembleTensor2AclDataset(acltdtTensorType tdt_type, const TensorRow &ts_row, - acltdtDataset *acl_dataset) { - if (tdt_type != ACL_TENSOR_DATA_TENSOR || ts_row.size() == 0) { - acltdtDataItem *acl_data = acltdtCreateDataItem(tdt_type, nullptr, 0, ACL_BOOL, nullptr, 0); - if (acl_data == nullptr) { - RETURN_STATUS_UNEXPECTED("Create data item failed when send data with type:" + std::to_string(tdt_type)); - } - if (acltdtAddDataItem(acl_dataset, acl_data) != ACL_SUCCESS) { - if (acltdtDestroyDataItem(acl_data) != ACL_SUCCESS) { - MS_LOG(ERROR) << "Destroy data item failed when send data with type: " << tdt_type; - } - RETURN_STATUS_UNEXPECTED("Add data item to tdt dataset failed when send data."); - } - return Status::OK(); - } - for (auto ts : ts_row) { - aclDataType datatype; - acltdtDataItem *acl_data = nullptr; - RETURN_IF_NOT_OK(getTdtType(ts->type(), datatype)); - + std::string datatype; + TdtStatus status = getTdtType(ts->type(), datatype); + if (status != SUCCESS) { + return status; + } TensorShape tsShape = ts->shape(); std::string dataShapes = "["; for (auto dim : tsShape.AsVector()) { @@ -162,46 +132,18 @@ Status TdtPlugin::AssembleTensor2AclDataset(acltdtTensorType tdt_type, const Ten } dataShapes.pop_back(); (void)dataShapes.append("]"); - - std::shared_ptr dataPtr = + DataItem data_item; + data_item.dataType_ = tdt::TDT_TENSOR; + data_item.tensorShape_ = dataShapes; + data_item.tensorType_ = datatype; + data_item.dataLen_ = ts->SizeInBytes(); + data_item.dataPtr_ = std::shared_ptr(reinterpret_cast(&(*ts->begin())), [](const void *elem) {}); - size_t dataLen = ts->SizeInBytes(); - const dsize_t dims = tsShape.Rank(); - std::vector dataShape; - for (auto i = 0; i < dims; i++) { - dataShape.emplace_back(tsShape[i]); - } - acl_data = acltdtCreateDataItem(ACL_TENSOR_DATA_TENSOR, (tsShape.empty() ? nullptr : &dataShape[0]), dims, datatype, - dataPtr.get(), dataLen); - if (acl_data == nullptr) { - RETURN_STATUS_UNEXPECTED("Create data item failed when send data."); - } - if (acltdtAddDataItem(acl_dataset, acl_data) != ACL_SUCCESS) { - if (acltdtDestroyDataItem(acl_data) != ACL_SUCCESS) { - MS_LOG(ERROR) << "Destroy data item failed when send data with type ACL_TENSOR_DATA_TENSOR."; - } - RETURN_STATUS_UNEXPECTED("Add data item to tdt dataset failed when send data."); - } - + items.emplace_back(data_item); MS_LOG(DEBUG) << "TDT data type is TDT_TENSOR, tensor type is " << datatype << ", tensor shape is " << dataShapes << ", data length is " << ts->Size() << "."; } - - return Status::OK(); -} - -Status TdtPlugin::DestroyAclDataset(acltdtDataset *acl_dataset, bool include_data_item) { - if (include_data_item) { - for (size_t i = 0; i < acltdtGetDatasetSize(acl_dataset); i++) { - if (acltdtDestroyDataItem(acltdtGetDataItem(acl_dataset, i)) != ACL_SUCCESS) { - RETURN_STATUS_UNEXPECTED("Destroy data item failed when send data."); - } - } - } - if (acltdtDestroyDataset(acl_dataset) != ACL_SUCCESS) { - RETURN_STATUS_UNEXPECTED("Destroy tdt dataset failed when send data."); - } - return Status::OK(); + return SUCCESS; } } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.h b/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.h index 0d2202d51bc..1275918c9fd 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.h +++ b/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.h @@ -22,40 +22,33 @@ #include #include #include -#include "acl/acl_tdt.h" -#include "minddata/dataset/engine/tdt/tdt_handle.h" +#include "tdt/tdt_host_interface.h" #include "minddata/dataset/core/data_type.h" #include "minddata/dataset/core/tensor.h" #include "minddata/dataset/core/tensor_row.h" -#include "minddata/dataset/util/status.h" namespace mindspore { namespace dataset { +enum TdtStatus { SUCCESS, FAILED }; + +using tdt::DataItem; class TdtPlugin { public: static std::shared_ptr GetInstance(); - Status hostPush(TensorRow ts_row, bool is_wait, std::string channel_name, bool profilig, int32_t &time, - acltdtTensorType tdt_type = ACL_TENSOR_DATA_TENSOR); - - TdtPlugin(const std::string &channel_name, int32_t device_id); - - ~TdtPlugin(); + TdtStatus hostPush(TensorRow ts_row, bool is_wait, std::string channel_name, bool profilig, int32_t &time, + tdt::TdtDataType tdt_type = tdt::TDT_TENSOR); private: - Status DestroyAclDataset(acltdtDataset *acl_dataset, bool include_data_item = true); + TdtPlugin() {} - Status AssembleTensor2AclDataset(acltdtTensorType tdt_type, const TensorRow &ts_row, acltdtDataset *acl_dataset); + TdtStatus getTdtType(DataType d_type, std::string &datatype); - Status getTdtType(DataType d_type, aclDataType &datatype); - - Status translate(acltdtTensorType tdt_type, const TensorRow &ts_row, acltdtDataset **output_acl_dataset); + TdtStatus translate(const TensorRow &ts_row, std::vector &items); void *tdt_handle_ = nullptr; - - acltdtChannelHandle *acl_handle_; }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/include/datasets.h b/mindspore/ccsrc/minddata/dataset/include/datasets.h index 40ad3f30644..421d3b9193e 100644 --- a/mindspore/ccsrc/minddata/dataset/include/datasets.h +++ b/mindspore/ccsrc/minddata/dataset/include/datasets.h @@ -152,16 +152,14 @@ class Dataset : public std::enable_shared_from_this { /// of data transmission per time is 256M. /// \param[in] queue_name Channel name (default="", create new unique name). /// \param[in] device_type Type of device (default="", get from MSContext). - /// \param[in] device_id id of device (default=0, get from MSContext). /// \param[in] num_epochs Number of epochs (default=-1, infinite epochs). /// \param[in] send_epoch_end Whether to send end of sequence to device or not (default=true). /// \param[in] total_batches Number of batches to be sent to the device (default=0, all data). /// \param[in] create_data_info_queue Whether to create queue which stores types and shapes /// of data or not(default=false). /// \return Returns true if no error encountered else false. - bool DeviceQueue(std::string queue_name = "", std::string device_type = "", int32_t device_id = 0, - int32_t num_epochs = -1, bool send_epoch_end = true, int32_t total_batches = 0, - bool create_data_info_queue = false); + bool DeviceQueue(std::string queue_name = "", std::string device_type = "", int32_t num_epochs = -1, + bool send_epoch_end = true, int32_t total_batches = 0, bool create_data_info_queue = false); /// \brief Function to create a Saver to save the dynamic data processed by the dataset pipeline /// \note Usage restrictions: diff --git a/mindspore/ccsrc/minddata/dataset/util/task.cc b/mindspore/ccsrc/minddata/dataset/util/task.cc index 5fda685b437..17fabbd77d2 100644 --- a/mindspore/ccsrc/minddata/dataset/util/task.cc +++ b/mindspore/ccsrc/minddata/dataset/util/task.cc @@ -23,9 +23,8 @@ #include "minddata/dataset/util/services.h" #endif #ifdef ENABLE_TDTQUE -#include "acl/acl_tdt.h" +#include "tdt/tdt_host_interface.h" #include "tdt/status.h" -#include "minddata/dataset/engine/tdt/tdt_handle.h" #endif namespace mindspore { @@ -164,10 +163,11 @@ Status Task::Join(WaitFlag blocking) { if (wait_times > 5 && my_name_.find("DeviceQueueOp") != std::string::npos) { MS_LOG(WARNING) << "Wait " << wait_times << " seconds, " << "the task: " << my_name_ << " will be destroyed by TdtHostDestory."; - if (!TdtHandle::DestroyHandle()) { - MS_LOG(WARNING) << "Destroy tdt channel failed."; + int32_t destory_status = tdt::TdtHostDestroy(); + if (destory_status != TDT_OK_CODE) { + MS_LOG(WARNING) << "Destroy tsd failed, status = " << destory_status << "."; } else { - MS_LOG(INFO) << "Destroy tdt channel success."; + MS_LOG(INFO) << "Destroy tsd success."; } // just wait 30 seconds diff --git a/mindspore/ccsrc/runtime/device/CMakeLists.txt b/mindspore/ccsrc/runtime/device/CMakeLists.txt index 3709a6188f4..a2466381768 100644 --- a/mindspore/ccsrc/runtime/device/CMakeLists.txt +++ b/mindspore/ccsrc/runtime/device/CMakeLists.txt @@ -1,6 +1,5 @@ file(GLOB_RECURSE DEVICE_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "common/*.cc" - "kernel_info.cc" "executor/dynamic_kernel.cc" "executor/executor_callback.cc" "kernel_runtime.cc" - "memory_manager.cc" "kernel_runtime_manager.cc" "convert_tensor_utils.cc" + "kernel_info.cc" "executor/dynamic_kernel.cc" "executor/executor_callback.cc" "kernel_runtime.cc" "memory_manager.cc" "kernel_runtime_manager.cc" "convert_tensor_utils.cc" ) if(ENABLE_GPU) @@ -10,8 +9,7 @@ else() endif() if(ENABLE_D) - file(GLOB_RECURSE D_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "ascend/*.cc" "kernel_adjust.cc" - "../../minddata/dataset/engine/tdt/tdt_handle.cc") + file(GLOB_RECURSE D_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "ascend/*.cc" "kernel_adjust.cc") endif() if(ENABLE_CPU) diff --git a/mindspore/ccsrc/runtime/device/ascend/ascend_kernel_runtime.cc b/mindspore/ccsrc/runtime/device/ascend/ascend_kernel_runtime.cc index b8fa3aaf4ac..83edaa36217 100644 --- a/mindspore/ccsrc/runtime/device/ascend/ascend_kernel_runtime.cc +++ b/mindspore/ccsrc/runtime/device/ascend/ascend_kernel_runtime.cc @@ -54,8 +54,8 @@ #include "runtime/device/ascend/profiling/profiling_callback_register.h" #include "backend/kernel_compiler/hccl/hccl_context.h" #ifdef ENABLE_TDTQUE -#include "minddata/dataset/engine/tdt/tdt_handle.h" -using mindspore::dataset::TdtHandle; +#include "tdt/tdt_host_interface.h" +#include "tdt/status.h" #endif using ge::model_runner::ModelRunner; @@ -698,10 +698,11 @@ bool AscendKernelRuntime::RunTask(const session::KernelGraph *graph) { #ifdef ENABLE_TDTQUE // Run task error, we should call TdtHostDestroy to release tdt to avoid DeviceQueueOp hostPush hung // case1: cpu usage 100% cause thread/process exit, but some tdt thread remain in backend - if (!TdtHandle::DestroyHandle()) { - MS_LOG(WARNING) << "Destroy tdt channel failed."; + int32_t destory_status = tdt::TdtHostDestroy(); + if (destory_status != TDT_OK_CODE) { + MS_LOG(WARNING) << "Destroy tsd failed, status = " << destory_status << "."; } else { - MS_LOG(INFO) << "Destroy tdt channel success."; + MS_LOG(INFO) << "Destroy tsd success."; } #endif return false; diff --git a/mindspore/ccsrc/utils/context/context_extends.cc b/mindspore/ccsrc/utils/context/context_extends.cc index 6e3edc87baa..9c987b6f486 100644 --- a/mindspore/ccsrc/utils/context/context_extends.cc +++ b/mindspore/ccsrc/utils/context/context_extends.cc @@ -22,6 +22,7 @@ #include #include "pybind11/pybind11.h" + #include "utils/ms_utils.h" #include "utils/convert_utils_base.h" @@ -45,7 +46,7 @@ bool OpenTsd(const std::shared_ptr &ms_context_ptr) { } if (ms_context_ptr->get_param(MS_CTX_TSD_REF)) { - MS_LOG(DEBUG) << "ACLTDT Dataset client is already opened."; + MS_LOG(DEBUG) << "TDT Dataset client is already opened."; ms_context_ptr->increase_param(MS_CTX_TSD_REF); return true; } @@ -55,8 +56,10 @@ bool OpenTsd(const std::shared_ptr &ms_context_ptr) { return true; } - uint32_t rank_size = 1; - uint32_t device_id = ms_context_ptr->get_param(MS_CTX_DEVICE_ID); + unsigned int device_id; + unsigned int rank_size = 1; + + device_id = ms_context_ptr->get_param(MS_CTX_DEVICE_ID); auto rank_size_env = common::GetEnv("RANK_SIZE"); if (rank_size_env.empty()) { @@ -78,14 +81,14 @@ bool OpenTsd(const std::shared_ptr &ms_context_ptr) { } ms_context_ptr->increase_param(MS_CTX_TSD_REF); #ifdef ENABLE_TDTQUE - acltdtChannelHandle *acl_handle = ms_context_ptr->get_acl_tdt_channel_handle(); - if (acl_handle == nullptr) { - MS_LOG(EXCEPTION) << "Get acltdt handle failed"; + int32_t initStatus = tdt::TdtHostInit(device_id); + if (initStatus != TDT_OK_CODE) { + MS_LOG(EXCEPTION) << "Init tsd failed, status = " << initStatus << "."; return false; } - ms_context_ptr->acl_tdt_print = std::thread(TensorPrint(acl_handle)); + ms_context_ptr->tdt_print_ = std::thread(TensorPrint()); #endif - MS_LOG(INFO) << "Get the acltdt handle successful, tsd reference = " + MS_LOG(INFO) << "Open and init tsd successful, tsd reference = " << ms_context_ptr->get_param(MS_CTX_TSD_REF) << "."; return true; } @@ -100,34 +103,28 @@ bool CloseTsd(const std::shared_ptr &ms_context_ptr, bool force) { ms_context_ptr->decrease_param(MS_CTX_TSD_REF); if (force || ms_context_ptr->get_param(MS_CTX_TSD_REF) == 0) { ms_context_ptr->set_param(MS_CTX_TSD_REF, 0); - #ifdef ENABLE_TDTQUE - acltdtChannelHandle *acl_handle = ms_context_ptr->get_acl_tdt_channel_handle(); - aclError stopStatus = acltdtStopChannel(acl_handle); - if (stopStatus != ACL_SUCCESS) { - MS_LOG(ERROR) << "Failed stop acl data channel for host queue "; - } else { - MS_LOG(INFO) << "Succeed stop acl data channel for host queue "; + int32_t stopStatus = tdt::TdtHostStop(KNpuLog); + if (stopStatus != TDT_OK_CODE) { + MS_LOG(EXCEPTION) << "Stop tsd failed, status = " << stopStatus << "."; + return false; } - MS_LOG(INFO) << "Succeed run cancellation callback of out-feed dequeue op "; - py::gil_scoped_release gil_release; - aclError destrodStatus = acltdtDestroyChannel(acl_handle); - if (destrodStatus != ACL_SUCCESS) { - MS_LOG(ERROR) << "Failed destroy acl channel for out-feed dequeue op "; - } else { - MS_LOG(INFO) << "Succeed destroy acl channel for out-feed dequeue op "; + int32_t destroyStatus = tdt::TdtHostDestroy(); + if (destroyStatus != TDT_OK_CODE) { + MS_LOG(EXCEPTION) << "Destroy tsd failed, status = " << destroyStatus << "."; + return false; } try { - if (ms_context_ptr->acl_tdt_print.joinable()) { - MS_LOG(INFO) << "join acl tdt host receive process"; - ms_context_ptr->acl_tdt_print.join(); + if (ms_context_ptr->tdt_print_.joinable()) { + MS_LOG(INFO) << "join tdt host receive process"; + ms_context_ptr->tdt_print_.join(); } } catch (const std::exception &e) { MS_LOG(ERROR) << "tdt thread join failed: " << e.what(); } #endif - uint32_t device_id = ms_context_ptr->get_param(MS_CTX_DEVICE_ID); + auto device_id = ms_context_ptr->get_param(MS_CTX_DEVICE_ID); auto ret = rtDeviceReset(device_id); if (ret != RT_ERROR_NONE) { MS_LOG(EXCEPTION) << "Device " << device_id << " call rtDeviceReset failed, ret[" << static_cast(ret) << "]"; @@ -136,9 +133,10 @@ bool CloseTsd(const std::shared_ptr &ms_context_ptr, bool force) { ms_context_ptr->set_param(MS_CTX_IS_PYNATIVE_GE_INIT, false); MS_LOG(INFO) << "Call rtDeviceReset, destroy and close tsd successful, ret[" << static_cast(ret) << "]"; } else { - MS_LOG(DEBUG) << "Acltdt Dataset client is used, no need to close, tsd reference = " + MS_LOG(DEBUG) << "TDT Dataset client is used, no need to close, tsd reference = " << ms_context_ptr->get_param(MS_CTX_TSD_REF) << "."; } + return true; } #else @@ -232,7 +230,7 @@ void GetGeOptions(const std::shared_ptr &ms_context_ptr, std::map &ms_context_ptr) { ms_context_ptr->get_param(MS_CTX_GE_REF) || ms_context_ptr->get_param(MS_CTX_TSD_REF)) { return true; } - (void)OpenTsd(ms_context_ptr); (void)InitGe(ms_context_ptr); ms_context_ptr->set_param(MS_CTX_IS_PYNATIVE_GE_INIT, true); diff --git a/mindspore/ccsrc/utils/context/context_extends.h b/mindspore/ccsrc/utils/context/context_extends.h index 3d8ca425643..36e6036e173 100644 --- a/mindspore/ccsrc/utils/context/context_extends.h +++ b/mindspore/ccsrc/utils/context/context_extends.h @@ -24,8 +24,8 @@ #include "utils/tensorprint_utils.h" #ifndef NO_DLIB -#include "acl/acl_tdt.h" #include "tdt/tsd_client.h" +#include "tdt/tdt_host_interface.h" #include "tdt/data_common.h" #include "runtime/dev.h" #endif @@ -35,8 +35,8 @@ namespace mindspore { namespace context { -bool OpenTsd(const std::shared_ptr &ms_context_ptr); -bool CloseTsd(const std::shared_ptr &ms_context_ptr, bool force = false); +bool OpenTsd(const std::shared_ptr &inst_context); +bool CloseTsd(const std::shared_ptr &inst_context, bool force = false); void SetHcclOptions(const std::shared_ptr &inst_context, std::map *ge_options); void GetGeOptions(const std::shared_ptr &inst_context, std::map *ge_options); void SetDisableReuseMemoryFlag(std::map *ge_options); diff --git a/mindspore/ccsrc/utils/tensorprint_utils.cc b/mindspore/ccsrc/utils/tensorprint_utils.cc index 173843a5d81..a92d1e2e463 100644 --- a/mindspore/ccsrc/utils/tensorprint_utils.cc +++ b/mindspore/ccsrc/utils/tensorprint_utils.cc @@ -1,5 +1,5 @@ /** - * Copyright 2020-2021 Huawei Technologies Co., Ltd + * Copyright 2020 Huawei Technologies Co., Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,48 +23,75 @@ #include "pybind11/pybind11.h" #include "utils/ms_utils.h" #include "utils/shape_utils.h" +#ifndef NO_DLIB +#include "tdt/tsd_client.h" +#include "tdt/tdt_host_interface.h" +#include "tdt/data_common.h" +#endif namespace py = pybind11; namespace mindspore { +const char kShapeSeperator[] = ","; +const char kShapeScalar[] = "[0]"; +const char kShapeNone[] = "[]"; +static std::map print_type_map = { + {"int8_t", TypeId::kNumberTypeInt8}, {"uint8_t", TypeId::kNumberTypeUInt8}, + {"int16_t", TypeId::kNumberTypeInt16}, {"uint16_t", TypeId::kNumberTypeUInt16}, + {"int32_t", TypeId::kNumberTypeInt32}, {"uint32_t", TypeId::kNumberTypeUInt32}, + {"int64_t", TypeId::kNumberTypeInt64}, {"uint64_t", TypeId::kNumberTypeUInt64}, + {"float16", TypeId::kNumberTypeFloat16}, {"float", TypeId::kNumberTypeFloat32}, + {"double", TypeId::kNumberTypeFloat64}, {"bool", TypeId::kNumberTypeBool}}; -#ifndef NO_DLIB -static std::map print_acl_data_type_map = { - {ACL_INT8, TypeId::kNumberTypeInt8}, {ACL_UINT8, TypeId::kNumberTypeUInt8}, - {ACL_INT16, TypeId::kNumberTypeInt16}, {ACL_UINT16, TypeId::kNumberTypeUInt16}, - {ACL_INT32, TypeId::kNumberTypeInt32}, {ACL_UINT32, TypeId::kNumberTypeUInt32}, - {ACL_INT64, TypeId::kNumberTypeInt64}, {ACL_UINT64, TypeId::kNumberTypeUInt64}, - {ACL_FLOAT16, TypeId::kNumberTypeFloat16}, {ACL_FLOAT, TypeId::kNumberTypeFloat32}, - {ACL_DOUBLE, TypeId::kNumberTypeFloat64}, {ACL_BOOL, TypeId::kNumberTypeBool}}; +static std::map type_size_map = { + {"int8_t", sizeof(int8_t)}, {"uint8_t", sizeof(uint8_t)}, {"int16_t", sizeof(int16_t)}, + {"uint16_t", sizeof(uint16_t)}, {"int32_t", sizeof(int32_t)}, {"uint32_t", sizeof(uint32_t)}, + {"int64_t", sizeof(int64_t)}, {"uint64_t", sizeof(uint64_t)}, {"float16", sizeof(float) / 2}, + {"float", sizeof(float)}, {"double", sizeof(double)}, {"bool", sizeof(bool)}}; -static std::map acl_data_type_size_map = { - {ACL_INT8, sizeof(int8_t)}, {ACL_UINT8, sizeof(uint8_t)}, {ACL_INT16, sizeof(int16_t)}, - {ACL_UINT16, sizeof(uint16_t)}, {ACL_INT32, sizeof(int32_t)}, {ACL_UINT32, sizeof(uint32_t)}, - {ACL_INT64, sizeof(int64_t)}, {ACL_UINT64, sizeof(uint64_t)}, {ACL_FLOAT16, sizeof(float) / 2}, - {ACL_FLOAT, sizeof(float)}, {ACL_DOUBLE, sizeof(double)}, {ACL_BOOL, sizeof(bool)}}; - -std::string GetParseType(const aclDataType &acl_data_type) { - static const std::map print_tensor_parse_map = { - {ACL_INT8, "Int8"}, {ACL_UINT8, "Uint8"}, {ACL_INT16, "Int16"}, {ACL_UINT16, "Uint16"}, - {ACL_INT32, "Int32"}, {ACL_UINT32, "Uint32"}, {ACL_INT64, "Int64"}, {ACL_UINT64, "Uint64"}, - {ACL_FLOAT16, "Float16"}, {ACL_FLOAT, "Float32"}, {ACL_DOUBLE, "Float64"}, {ACL_BOOL, "Bool"}}; - auto type_iter = print_tensor_parse_map.find(acl_data_type); - if (type_iter == print_tensor_parse_map.end()) { - MS_LOG(EXCEPTION) << "type of tensor need to print is not support " << acl_data_type; +std::string GetParseType(const std::string &tensorType_) { + static const std::map print_parse_map = { + {"int8_t", "Int8"}, {"uint8_t", "Uint8"}, {"int16_t", "Int16"}, {"uint16_t", "Uint16"}, + {"int32_t", "Int32"}, {"uint32_t", "Uint32"}, {"int64_t", "Int64"}, {"uint64_t", "Uint64"}, + {"float16", "Float16"}, {"float", "Float32"}, {"double", "Float64"}, {"bool", "Bool"}}; + auto type_iter = print_parse_map.find(tensorType_); + if (type_iter == print_parse_map.end()) { + MS_LOG(EXCEPTION) << "type of tensor need to print is not support " << tensorType_; } return type_iter->second; } +bool ParseTensorShape(const std::string &input_shape_str, ShapeVector *const tensor_shape, size_t *dims) { + if (tensor_shape == nullptr) { + return false; + } + MS_EXCEPTION_IF_NULL(dims); + std::string shape_str = input_shape_str; + if (shape_str.size() <= 2) { + return false; + } + (void)shape_str.erase(shape_str.begin()); + shape_str.pop_back(); + shape_str += kShapeSeperator; + string::size_type pos_begin = 0; + string::size_type pos_end = shape_str.find(kShapeSeperator); + while (pos_end != std::string::npos) { + string dim_str = shape_str.substr(pos_begin, pos_end - pos_begin); + tensor_shape->emplace_back(std::stoi(dim_str)); + (*dims) = (*dims) * std::stoul(dim_str); + pos_begin = pos_end + sizeof(kShapeSeperator) - 1; + pos_end = shape_str.find(kShapeSeperator, pos_begin); + } + return true; +} + bool PrintTensorToString(const char *str_data_ptr, mindspore::tensor::Tensor *const print_tensor, const size_t &memory_size) { MS_EXCEPTION_IF_NULL(str_data_ptr); MS_EXCEPTION_IF_NULL(print_tensor); auto *tensor_data_ptr = static_cast(print_tensor->data_c()); MS_EXCEPTION_IF_NULL(tensor_data_ptr); - - size_t dest_size = static_cast(print_tensor->data().nbytes()); - size_t target_size = memory_size; - - auto cp_ret = memcpy_s(tensor_data_ptr, dest_size, str_data_ptr, target_size); + auto cp_ret = + memcpy_s(tensor_data_ptr, static_cast(print_tensor->data().nbytes()), str_data_ptr, memory_size); if (cp_ret != EOK) { MS_LOG(ERROR) << "Print op Failed to copy the memory to py::tensor " << cp_ret; return false; @@ -73,10 +100,10 @@ bool PrintTensorToString(const char *str_data_ptr, mindspore::tensor::Tensor *co } template -void PrintScalarToString(const char *str_data_ptr, const aclDataType &acl_data_type, std::ostringstream *const buf) { +void PrintScalarToString(const char *str_data_ptr, const string &tensor_type, std::ostringstream *const buf) { MS_EXCEPTION_IF_NULL(str_data_ptr); MS_EXCEPTION_IF_NULL(buf); - *buf << "Tensor(shape=[], dtype=" << GetParseType(acl_data_type) << ", value="; + *buf << "Tensor(shape=[], dtype=" << GetParseType(tensor_type) << ", value="; const T *data_ptr = reinterpret_cast(str_data_ptr); if constexpr (std::is_same::value || std::is_same::value) { const int int_data = static_cast(*data_ptr); @@ -86,12 +113,11 @@ void PrintScalarToString(const char *str_data_ptr, const aclDataType &acl_data_t } } -void PrintScalarToBoolString(const char *str_data_ptr, const aclDataType &acl_data_type, - std::ostringstream *const buf) { +void PrintScalarToBoolString(const char *str_data_ptr, const string &tensor_type, std::ostringstream *const buf) { MS_EXCEPTION_IF_NULL(str_data_ptr); MS_EXCEPTION_IF_NULL(buf); const bool *data_ptr = reinterpret_cast(str_data_ptr); - *buf << "Tensor(shape=[], dtype=" << GetParseType(acl_data_type) << ", value="; + *buf << "Tensor(shape=[], dtype=" << GetParseType(tensor_type) << ", value="; if (*data_ptr) { *buf << "True)\n"; } else { @@ -99,99 +125,89 @@ void PrintScalarToBoolString(const char *str_data_ptr, const aclDataType &acl_da } } -void convertDataItem2Scalar(const char *str_data_ptr, const aclDataType &acl_data_type, std::ostringstream *const buf) { +void convertDataItem2Scalar(const char *str_data_ptr, const string &tensor_type, std::ostringstream *const buf) { MS_EXCEPTION_IF_NULL(str_data_ptr); MS_EXCEPTION_IF_NULL(buf); - auto type_iter = print_acl_data_type_map.find(acl_data_type); + auto type_iter = print_type_map.find(tensor_type); auto type_id = type_iter->second; if (type_id == TypeId::kNumberTypeBool) { - PrintScalarToBoolString(str_data_ptr, acl_data_type, buf); + PrintScalarToBoolString(str_data_ptr, tensor_type, buf); } else if (type_id == TypeId::kNumberTypeInt8) { - PrintScalarToString(str_data_ptr, acl_data_type, buf); + PrintScalarToString(str_data_ptr, tensor_type, buf); } else if (type_id == TypeId::kNumberTypeUInt8) { - PrintScalarToString(str_data_ptr, acl_data_type, buf); + PrintScalarToString(str_data_ptr, tensor_type, buf); } else if (type_id == TypeId::kNumberTypeInt16) { - PrintScalarToString(str_data_ptr, acl_data_type, buf); + PrintScalarToString(str_data_ptr, tensor_type, buf); } else if (type_id == TypeId::kNumberTypeUInt16) { - PrintScalarToString(str_data_ptr, acl_data_type, buf); + PrintScalarToString(str_data_ptr, tensor_type, buf); } else if (type_id == TypeId::kNumberTypeInt32) { - PrintScalarToString(str_data_ptr, acl_data_type, buf); + PrintScalarToString(str_data_ptr, tensor_type, buf); } else if (type_id == TypeId::kNumberTypeUInt32) { - PrintScalarToString(str_data_ptr, acl_data_type, buf); + PrintScalarToString(str_data_ptr, tensor_type, buf); } else if (type_id == TypeId::kNumberTypeInt64) { - PrintScalarToString(str_data_ptr, acl_data_type, buf); + PrintScalarToString(str_data_ptr, tensor_type, buf); } else if (type_id == TypeId::kNumberTypeUInt64) { - PrintScalarToString(str_data_ptr, acl_data_type, buf); + PrintScalarToString(str_data_ptr, tensor_type, buf); } else if (type_id == TypeId::kNumberTypeFloat16) { - PrintScalarToString(str_data_ptr, acl_data_type, buf); + PrintScalarToString(str_data_ptr, tensor_type, buf); } else if (type_id == TypeId::kNumberTypeFloat32) { - PrintScalarToString(str_data_ptr, acl_data_type, buf); + PrintScalarToString(str_data_ptr, tensor_type, buf); } else if (type_id == TypeId::kNumberTypeFloat64) { - PrintScalarToString(str_data_ptr, acl_data_type, buf); + PrintScalarToString(str_data_ptr, tensor_type, buf); } else { - MS_LOG(EXCEPTION) << "Cannot print scalar because of unsupported data type: " << GetParseType(acl_data_type) << "."; + MS_LOG(EXCEPTION) << "Cannot print scalar because of unsupported data type: " << tensor_type << "."; } } -bool judgeLengthValid(const size_t str_len, const aclDataType &acl_data_type) { - auto type_iter = acl_data_type_size_map.find(acl_data_type); - if (type_iter == acl_data_type_size_map.end()) { +bool judgeLengthValid(const size_t str_len, const string &tensor_type) { + auto type_iter = type_size_map.find(tensor_type); + if (type_iter == type_size_map.end()) { MS_LOG(EXCEPTION) << "type of scalar to print is not support."; } return str_len == type_iter->second; } -bool ConvertDataset2Tensor(acltdtDataset *acl_dataset) { +#ifndef NO_DLIB +bool ConvertDataItem2Tensor(const std::vector &items) { // Acquire Python GIL py::gil_scoped_acquire gil_acquire; std::ostringstream buf; bool ret_end_sequence = false; - - size_t acl_dataset_size = acltdtGetDatasetSize(acl_dataset); - - for (size_t i = 0; i < acl_dataset_size; i++) { - acltdtDataItem *item = acltdtGetDataItem(acl_dataset, i); - if (acltdtGetTensorTypeFromItem(item) == ACL_TENSOR_DATA_END_OF_SEQUENCE) { + for (auto &item : items) { + if (item.dataType_ == tdt::TDT_END_OF_SEQUENCE) { ret_end_sequence = true; - MS_LOG(INFO) << "end of sequence" << std::endl; break; } - - size_t dim_num = acltdtGetDimNumFromItem(item); - void *acl_addr = acltdtGetDataAddrFromItem(item); - size_t acl_data_size = acltdtGetDataSizeFromItem(item); - aclDataType acl_data_type = acltdtGetDataTypeFromItem(item); - char *acl_data = reinterpret_cast(acl_addr); - acl_data = const_cast(reinterpret_cast(acl_data)->c_str()); - MS_EXCEPTION_IF_NULL(acl_data); - - ShapeVector tensorShape; - tensorShape.resize(dim_num); - - if (acltdtGetDimsFromItem(item, tensorShape.data(), dim_num) != ACL_SUCCESS) { - MS_LOG(ERROR) << "ACL failed get dim-size from acl channel data"; - } - - if ((tensorShape.size() == 1 && tensorShape[0] == 0) || tensorShape.size() == 0) { - if (!judgeLengthValid(acl_data_size, acl_data_type)) { + std::shared_ptr str_data_ptr = std::static_pointer_cast(item.dataPtr_); + MS_EXCEPTION_IF_NULL(str_data_ptr); + if (item.tensorShape_ == kShapeScalar || item.tensorShape_ == kShapeNone) { + if (!judgeLengthValid(str_data_ptr->size(), item.tensorType_)) { MS_LOG(EXCEPTION) << "Print op receive data length is invalid."; } - convertDataItem2Scalar(acl_data, acl_data_type, &buf); + convertDataItem2Scalar(str_data_ptr->data(), item.tensorType_, &buf); continue; } - if (acl_data_type == ACL_STRING) { - std::string data(reinterpret_cast(acl_data), acl_data_size); + ShapeVector tensor_shape; + size_t totaldims = 1; + if (!ParseTensorShape(item.tensorShape_, &tensor_shape, &totaldims)) { + MS_LOG(ERROR) << "Tensor print can not parse tensor shape, receive info" << item.tensorShape_; + continue; + } + + if (item.tensorType_ == "string") { + std::string data(reinterpret_cast(str_data_ptr->c_str()), item.dataLen_); buf << data << std::endl; } else { - auto type_iter = print_acl_data_type_map.find(acl_data_type); - if (type_iter == print_acl_data_type_map.end()) { - MS_LOG(ERROR) << "type of tensor need to print is not support " << GetParseType(acl_data_type); + auto type_iter = print_type_map.find(item.tensorType_); + if (type_iter == print_type_map.end()) { + MS_LOG(ERROR) << "type of tensor need to print is not support " << item.tensorType_; continue; } auto type_id = type_iter->second; - mindspore::tensor::Tensor print_tensor(type_id, tensorShape); - if (PrintTensorToString(acl_data, &print_tensor, acl_data_size)) { + mindspore::tensor::Tensor print_tensor(type_id, tensor_shape); + auto memory_size = totaldims * type_size_map[item.tensorType_]; + if (PrintTensorToString(str_data_ptr->data(), &print_tensor, memory_size)) { buf << print_tensor.ToStringNoLimit() << std::endl; } } @@ -200,63 +216,44 @@ bool ConvertDataset2Tensor(acltdtDataset *acl_dataset) { return ret_end_sequence; } -bool SaveDataset2File(acltdtDataset *acl_dataset, const std::string &print_file_path, prntpb::Print print, - std::fstream *output) { +bool SaveDataItem2File(const std::vector &items, const std::string &print_file_path, prntpb::Print print, + std::fstream *output) { bool ret_end_thread = false; - - for (size_t i = 0; i < acltdtGetDatasetSize(acl_dataset); i++) { - acltdtDataItem *item = acltdtGetDataItem(acl_dataset, i); - MS_EXCEPTION_IF_NULL(item); - acltdtTensorType acl_tensor_type = acltdtGetTensorTypeFromItem(item); - - if (acl_tensor_type == ACL_TENSOR_DATA_END_OF_SEQUENCE) { - MS_LOG(INFO) << "Acl channel received end-of-sequence for print op."; + for (auto &item : items) { + if (item.dataType_ == tdt::TDT_END_OF_SEQUENCE) { ret_end_thread = true; break; - } else if (acl_tensor_type == ACL_TENSOR_DATA_ABNORMAL) { - MS_LOG(INFO) << "Acl channel received abnormal for print op."; - return true; - } else if (acl_tensor_type == ACL_TENSOR_DATA_UNDEFINED) { - MS_LOG(INFO) << "Acl channel received undefined message type for print op."; - return false; } - prntpb::Print_Value *value = print.add_value(); - size_t dim_num = acltdtGetDimNumFromItem(item); - void *acl_addr = acltdtGetDataAddrFromItem(item); - size_t acl_data_size = acltdtGetDataSizeFromItem(item); - aclDataType acl_data_type = acltdtGetDataTypeFromItem(item); - char *acl_data = reinterpret_cast(acl_addr); - MS_EXCEPTION_IF_NULL(acl_data); - - ShapeVector tensorShape; - tensorShape.resize(dim_num); - - if (acltdtGetDimsFromItem(item, tensorShape.data(), dim_num) != ACL_SUCCESS) { - MS_LOG(ERROR) << "ACL failed get dim-size from acl channel data"; - } - - if ((tensorShape.size() == 1 && tensorShape[0] == 0) || tensorShape.size() == 0) { - if (!judgeLengthValid(acl_data_size, acl_data_type)) { + std::shared_ptr str_data_ptr = std::static_pointer_cast(item.dataPtr_); + MS_EXCEPTION_IF_NULL(str_data_ptr); + if (item.tensorShape_ == kShapeScalar || item.tensorShape_ == kShapeNone) { + if (!judgeLengthValid(str_data_ptr->size(), item.tensorType_)) { MS_LOG(ERROR) << "Print op receive data length is invalid."; ret_end_thread = true; } } - if (acl_data_type == ACL_STRING) { - std::string data(reinterpret_cast(acl_data), acl_data_size); + ShapeVector tensor_shape; + size_t totaldims = 1; + if (!ParseTensorShape(item.tensorShape_, &tensor_shape, &totaldims)) { + MS_LOG(ERROR) << "Tensor print can not parse tensor shape, receive info" << item.tensorShape_; + ret_end_thread = true; + } + + if (item.tensorType_ == "string") { + std::string data(reinterpret_cast(str_data_ptr->c_str()), item.dataLen_); value->set_desc(data); } else { - auto parse_type = GetParseType(acl_data_type); + auto parse_type = GetParseType(item.tensorType_); prntpb::TensorProto *tensor = value->mutable_tensor(); - if (tensorShape.size() > 1 || (tensorShape.size() == 1 && tensorShape[0] != 1)) { - for (const auto &dim : tensorShape) { + if (!(item.tensorShape_ == kShapeScalar) && !(item.tensorShape_ == kShapeNone)) { + for (const auto &dim : tensor_shape) { tensor->add_dims(static_cast<::google::protobuf::int64>(dim)); } } - tensor->set_tensor_type(parse_type); - std::string data(reinterpret_cast(acl_data), acl_data_size); + std::string data(reinterpret_cast(str_data_ptr->c_str()), item.dataLen_); tensor->set_tensor_content(data); } @@ -277,37 +274,29 @@ void TensorPrint::operator()() { std::string print_file_path = ms_context->get_param(MS_CTX_PRINT_FILE_PATH); if (print_file_path == "") { while (true) { - acltdtDataset *acl_dataset = acltdtCreateDataset(); - if (acl_dataset == nullptr) { - MS_LOG(ERROR) << "Failed create acl dateaset."; - } - if (acltdtReceiveTensor(acl_handle_, acl_dataset, -1 /* no timeout */) != ACL_SUCCESS) { - MS_LOG(ERROR) << "Acltdt receive tensor failed"; + std::vector bundle; + if (tdt::TdtHostPopData("_npu_log", bundle) != 0) { break; } - if (ConvertDataset2Tensor(acl_dataset)) { + if (ConvertDataItem2Tensor(bundle)) { break; } } } else { std::fstream output(print_file_path, std::ios::out | std::ios::trunc | std::ios::binary); while (true) { - acltdtDataset *acl_dataset = acltdtCreateDataset(); - if (acl_dataset == nullptr) { - MS_LOG(ERROR) << "Failed create acl dateaset."; - } - if (acltdtReceiveTensor(acl_handle_, acl_dataset, -1 /* no timeout */) != ACL_SUCCESS) { - MS_LOG(ERROR) << "Acltdt receive tensor failed"; + std::vector bundle; + if (tdt::TdtHostPopData("_npu_log", bundle) != 0) { break; } - if (SaveDataset2File(acl_dataset, print_file_path, print, &output)) { + if (SaveDataItem2File(bundle, print_file_path, print, &output)) { break; } } output.close(); std::string path_string = print_file_path; if (chmod(common::SafeCStr(path_string), S_IRUSR) == -1) { - MS_LOG(ERROR) << "Modify file:" << print_file_path << " to fail."; + MS_LOG(ERROR) << "Modify file:" << print_file_path << " to r fail."; return; } } diff --git a/mindspore/ccsrc/utils/tensorprint_utils.h b/mindspore/ccsrc/utils/tensorprint_utils.h index cf84e710991..b150368f71f 100644 --- a/mindspore/ccsrc/utils/tensorprint_utils.h +++ b/mindspore/ccsrc/utils/tensorprint_utils.h @@ -20,10 +20,9 @@ #include #include "ir/dtype/type.h" #ifndef NO_DLIB -#include "acl/acl_tdt.h" #include "tdt/tsd_client.h" -#include "tdt/data_common.h" #include "tdt/tdt_host_interface.h" +#include "tdt/data_common.h" #include "proto/print.pb.h" #include "utils/ms_context.h" #endif @@ -33,11 +32,7 @@ class TensorPrint { TensorPrint() {} ~TensorPrint() = default; #ifndef NO_DLIB - explicit TensorPrint(acltdtChannelHandle *acl_handle) { acl_handle_ = acl_handle; } void operator()(); - - private: - acltdtChannelHandle *acl_handle_ = nullptr; #endif }; } // namespace mindspore diff --git a/mindspore/core/utils/ms_context.cc b/mindspore/core/utils/ms_context.cc index cfd2ffc7a26..7d3afd23105 100644 --- a/mindspore/core/utils/ms_context.cc +++ b/mindspore/core/utils/ms_context.cc @@ -50,7 +50,6 @@ MsContext::MsContext(const std::string &policy, const std::string &target) { } else { set_param(MS_CTX_DEVICE_ID, 0); } - set_param(MS_CTX_MAX_CALL_DEPTH, MAX_CALL_DEPTH_DEFAULT); set_param(MS_CTX_DEVICE_TARGET, target); set_param(MS_CTX_EXECUTION_MODE, kPynativeMode); @@ -108,22 +107,4 @@ std::string MsContext::backend_policy() const { } return "unknown"; } - -#ifdef ENABLE_TDTQUE -acltdtChannelHandle *MsContext::get_acl_tdt_channel_handle() { - if (acl_handle == nullptr) { - std::string kReceivePrefix = "TF_RECEIVE_"; - std::string channel_name = "_npu_log"; - uint32_t device_id = get_param(MS_CTX_DEVICE_ID); - acl_handle = acltdtCreateChannel(device_id, (kReceivePrefix + channel_name).c_str()); - if (acl_handle == nullptr) { - MS_LOG(ERROR) << "Failed to create acltdt handle : " << channel_name; - return nullptr; - } - MS_LOG(INFO) << "Success to create acltdt handle: " << channel_name; - return acl_handle; - } - return acl_handle; -} -#endif } // namespace mindspore diff --git a/mindspore/core/utils/ms_context.h b/mindspore/core/utils/ms_context.h index 09e37d43975..3beeabeb922 100644 --- a/mindspore/core/utils/ms_context.h +++ b/mindspore/core/utils/ms_context.h @@ -24,10 +24,7 @@ #include #include #include "utils/log_adapter.h" -#include "utils/ms_utils.h" -#ifndef NO_DLIB -#include "acl/acl_tdt.h" -#endif + namespace mindspore { enum MsBackendPolicy { kMsBackendGeOnly = 0, @@ -132,13 +129,11 @@ class MsContext { std::string backend_policy() const; bool set_backend_policy(const std::string &policy); -#ifdef ENABLE_TDTQUE - acltdtChannelHandle *get_acl_tdt_channel_handle(); -#endif + static void device_seter(DeviceSeter device) { seter_ = device; } static void device_type_seter(DeviceTypeSeter device_type) { device_type_seter_ = device_type; } - std::thread acl_tdt_print; + std::thread tdt_print_; template void set_param(MsCtxParam param, const T &value) { @@ -173,9 +168,6 @@ class MsContext { std::string string_params_[MsCtxParam::NUM_STRING_PARAMS]; MsBackendPolicy backend_policy_; -#ifdef ENABLE_TDTQUE - acltdtChannelHandle *acl_handle = nullptr; -#endif }; // set method implementation for type bool/int/uint32_t/float/std::string diff --git a/mindspore/dataset/engine/datasets.py b/mindspore/dataset/engine/datasets.py index 0f2325586d7..42737274665 100644 --- a/mindspore/dataset/engine/datasets.py +++ b/mindspore/dataset/engine/datasets.py @@ -2698,11 +2698,10 @@ class TransferDataset(Dataset): def parse(self, children=None): total_batch = 0 - device_id = context.get_context("device_id") if hasattr(self.children[0], "__total_batch__"): total_batch = self.children[0].__total_batch__ - return cde.TransferNode(children[0], self.queue_name, self.device_type, device_id, self._send_epoch_end, - total_batch, self._create_data_info_queue) + return cde.TransferNode(children[0], self.queue_name, self.device_type, self._send_epoch_end, total_batch, + self._create_data_info_queue) def create_dict_iterator(self, num_epochs=-1, output_numpy=False): raise RuntimeError("TransferDataset is not iterable.") diff --git a/tests/st/ops/ascend/test_tensor_print/tensor_print_utils.py b/tests/st/ops/ascend/test_tensor_print/tensor_print_utils.py index 38fb0e60b51..9178bd0ecc3 100644 --- a/tests/st/ops/ascend/test_tensor_print/tensor_print_utils.py +++ b/tests/st/ops/ascend/test_tensor_print/tensor_print_utils.py @@ -54,20 +54,15 @@ def get_tensor(is_scalar, input_type): if __name__ == "__main__": net = TensorPrint() - # net(get_tensor('scalar', mindspore.bool_), get_tensor('scalar', mindspore.uint8), - # get_tensor('scalar', mindspore.int8), get_tensor('scalar', mindspore.uint16), - # get_tensor('scalar', mindspore.int16), get_tensor('scalar', mindspore.uint32), - # get_tensor('scalar', mindspore.int32), get_tensor('scalar', mindspore.uint64), - # get_tensor('scalar', mindspore.int64), get_tensor('scalar', mindspore.float16), - # get_tensor('scalar', mindspore.float32), get_tensor('scalar', mindspore.float64), - # get_tensor('array', mindspore.bool_), get_tensor('array', mindspore.uint8), - # get_tensor('array', mindspore.int8), get_tensor('array', mindspore.uint16), - # get_tensor('array', mindspore.int16), get_tensor('array', mindspore.uint32), - # get_tensor('array', mindspore.int32), get_tensor('array', mindspore.uint64), - # get_tensor('array', mindspore.int64), get_tensor('array', mindspore.float16), - # get_tensor('array', mindspore.float32), get_tensor('array', mindspore.float64)) - - net(get_tensor('scalar', mindspore.bool_), + net(get_tensor('scalar', mindspore.bool_), get_tensor('scalar', mindspore.uint8), + get_tensor('scalar', mindspore.int8), get_tensor('scalar', mindspore.uint16), + get_tensor('scalar', mindspore.int16), get_tensor('scalar', mindspore.uint32), + get_tensor('scalar', mindspore.int32), get_tensor('scalar', mindspore.uint64), + get_tensor('scalar', mindspore.int64), get_tensor('scalar', mindspore.float16), get_tensor('scalar', mindspore.float32), get_tensor('scalar', mindspore.float64), - get_tensor('array', mindspore.bool_), + get_tensor('array', mindspore.bool_), get_tensor('array', mindspore.uint8), + get_tensor('array', mindspore.int8), get_tensor('array', mindspore.uint16), + get_tensor('array', mindspore.int16), get_tensor('array', mindspore.uint32), + get_tensor('array', mindspore.int32), get_tensor('array', mindspore.uint64), + get_tensor('array', mindspore.int64), get_tensor('array', mindspore.float16), get_tensor('array', mindspore.float32), get_tensor('array', mindspore.float64))