!40723 rename DeviceQueueOp=>DataQueueOp, TransferNode=>DataQueueNode

Merge pull request !40723 from zhoufeng/delete-macro
This commit is contained in:
i-robot 2022-08-29 02:24:58 +00:00 committed by Gitee
commit aaeae5d3ae
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
46 changed files with 278 additions and 282 deletions

View File

@ -30,12 +30,13 @@ class DeviceContext;
enum class DataQueueStatus : int { SUCCESS = 0, QUEUE_EXIST, QUEUE_NOT_EXIST, ERROR_INPUT, INTERNAL_ERROR, TIMEOUT };
struct DataQueueItem {
int32_t worker_id_{0};
std::string data_type_;
size_t data_len_{0};
void *data_ptr_{nullptr};
std::vector<int64_t> shapes_;
void *device_addr_{nullptr};
int32_t worker_id{0};
std::string data_type;
size_t data_len{0};
void *data_ptr{nullptr};
std::vector<int64_t> shapes;
void *device_addr{nullptr};
// add tensor type when tdt need more types than data and end-of-sequence
};
class DataQueue {
@ -58,8 +59,6 @@ class DataQueue {
virtual size_t Size() { return size_; }
virtual size_t Capacity() { return capacity_; }
virtual std::shared_ptr<void> AllocHostMem(size_t size) { return std::shared_ptr<void>(::malloc(size), ::free); }
protected:
size_t head_;
size_t tail_;

View File

@ -99,8 +99,7 @@ class BACKEND_EXPORT DataQueueMgr {
DataQueueStatus CreateDynamicBufQueue(const std::string &channel_name, const size_t &capacity);
DataQueueStatus OpenDynamicBufQueue(const std::string &channel_name);
std::shared_ptr<DataQueue> GetDataQueue(const std::string &channel_name) const;
DataQueueStatus SetThreadDevice(const std::string &device_name);
std::shared_ptr<void> AllocHostMem(const std::string &device_name, size_t size);
DataQueueStatus SetThreadDevice(const std::string &channel_name);
void Close(const std::string &channel_name) const noexcept;

View File

@ -66,7 +66,7 @@
#ifndef ENABLE_ANDROID
#include "minddata/dataset/engine/ir/datasetops/skip_node.h"
#include "minddata/dataset/engine/ir/datasetops/take_node.h"
#include "minddata/dataset/engine/ir/datasetops/transfer_node.h"
#include "minddata/dataset/engine/ir/datasetops/data_queue_node.h"
#include "minddata/dataset/engine/ir/datasetops/zip_node.h"
#endif
@ -228,10 +228,10 @@ bool Dataset::DeviceQueueCharIF(const std::vector<char> &queue_name, const std::
return false;
}
// Add TransferNode IR on top of dataset
// Add DataQueueNode IR on top of dataset
auto ds =
std::make_shared<TransferNode>(shared_from_this()->IRNode(), CharToString(queue_name), CharToString(device_type),
device_id, send_epoch_end, total_batches, create_data_info_queue);
std::make_shared<DataQueueNode>(shared_from_this()->IRNode(), CharToString(queue_name), CharToString(device_type),
device_id, send_epoch_end, total_batches, create_data_info_queue);
// Get ToDevice consumer
auto consumer = std::make_unique<ToDevice>(num_epochs);

View File

@ -37,7 +37,7 @@
#include "minddata/dataset/engine/ir/datasetops/shuffle_node.h"
#include "minddata/dataset/engine/ir/datasetops/skip_node.h"
#include "minddata/dataset/engine/ir/datasetops/take_node.h"
#include "minddata/dataset/engine/ir/datasetops/transfer_node.h"
#include "minddata/dataset/engine/ir/datasetops/data_queue_node.h"
#include "minddata/dataset/engine/ir/datasetops/zip_node.h"
// IR non-leaf nodes - for android
@ -294,13 +294,13 @@ PYBIND_REGISTER(TakeNode, 2, ([](const py::module *m) {
}));
}));
PYBIND_REGISTER(TransferNode, 2, ([](const py::module *m) {
(void)py::class_<TransferNode, DatasetNode, std::shared_ptr<TransferNode>>(*m, "TransferNode",
"to create a TransferNode")
PYBIND_REGISTER(DataQueueNode, 2, ([](const py::module *m) {
(void)py::class_<DataQueueNode, DatasetNode, std::shared_ptr<DataQueueNode>>(
*m, "DataQueueNode", "to create a DataQueueNode")
.def(py::init([](const std::shared_ptr<DatasetNode> &self, const std::string &queue_name,
const std::string &device_type, int32_t device_id, bool send_epoch_end,
int32_t total_batch, bool create_data_info_queue) {
auto transfer = std::make_shared<TransferNode>(
auto transfer = std::make_shared<DataQueueNode>(
self, queue_name, device_type, device_id, send_epoch_end, total_batch, create_data_info_queue);
THROW_IF_ERROR(transfer->ValidateParams());
return transfer;

View File

@ -41,7 +41,7 @@
#include "minddata/dataset/engine/datasetops/batch_op.h"
#include "minddata/dataset/engine/datasetops/dataset_op.h"
#include "minddata/dataset/engine/datasetops/device_queue_op.h"
#include "minddata/dataset/engine/datasetops/data_queue_op.h"
#include "minddata/dataset/engine/datasetops/map_op/map_op.h"
#include "minddata/dataset/engine/datasetops/project_op.h"
#include "minddata/dataset/engine/datasetops/rename_op.h"

View File

@ -21,7 +21,7 @@
#include <utility>
#include <vector>
#include "minddata/dataset/engine/consumers/tree_consumer.h"
#include "minddata/dataset/engine/datasetops/device_queue_op.h"
#include "minddata/dataset/engine/datasetops/data_queue_op.h"
#include "minddata/dataset/engine/opt/pre/getter_pass.h"
#ifndef ENABLE_SECURITY
#include "minddata/dataset/engine/perf/auto_tune.h"
@ -298,11 +298,11 @@ Status ToDevice::Send() {
}
Status ToDevice::Continue() {
// tree_.root() must be DeviceQueueOp
// tree_.root() must be DataQueueOp
std::shared_ptr<DatasetOp> root = std::shared_ptr<DatasetOp>(tree_adapter_->GetRoot());
CHECK_FAIL_RETURN_UNEXPECTED(root != nullptr, "Root is a nullptr.");
DeviceQueueOp *op = dynamic_cast<DeviceQueueOp *>(root.get());
CHECK_FAIL_RETURN_UNEXPECTED(op != nullptr, "ContinueSend only supported by DeviceQueueOp");
DataQueueOp *op = dynamic_cast<DataQueueOp *>(root.get());
CHECK_FAIL_RETURN_UNEXPECTED(op != nullptr, "ContinueSend only supported by DataQueueOp");
op->ContinueSend();
return Status::OK();
}
@ -310,8 +310,8 @@ Status ToDevice::Continue() {
Status ToDevice::Stop() {
std::shared_ptr<DatasetOp> root = std::shared_ptr<DatasetOp>(tree_adapter_->GetRoot());
CHECK_FAIL_RETURN_UNEXPECTED(root != nullptr, "Root is a nullptr.");
DeviceQueueOp *op = dynamic_cast<DeviceQueueOp *>(root.get());
CHECK_FAIL_RETURN_UNEXPECTED(op != nullptr, "StopSend only supported by DeviceQueueOp");
DataQueueOp *op = dynamic_cast<DataQueueOp *>(root.get());
CHECK_FAIL_RETURN_UNEXPECTED(op != nullptr, "StopSend only supported by DataQueueOp");
op->StopSend();
return Status::OK();
@ -320,11 +320,11 @@ Status ToDevice::Stop() {
Status ToDevice::GetDataInfo(std::vector<DataType> *const types, std::vector<TensorShape> *const shapes) {
RETURN_UNEXPECTED_IF_NULL(types);
RETURN_UNEXPECTED_IF_NULL(shapes);
// tree_.root() must be DeviceQueueOp
// tree_.root() must be DataQueueOp
std::shared_ptr<DatasetOp> root = std::shared_ptr<DatasetOp>(tree_adapter_->GetRoot());
CHECK_FAIL_RETURN_UNEXPECTED(root != nullptr, "Root is a nullptr.");
DeviceQueueOp *op = dynamic_cast<DeviceQueueOp *>(root.get());
CHECK_FAIL_RETURN_UNEXPECTED(op != nullptr, "GetDataInfo only supported by DeviceQueueOp");
DataQueueOp *op = dynamic_cast<DataQueueOp *>(root.get());
CHECK_FAIL_RETURN_UNEXPECTED(op != nullptr, "GetDataInfo only supported by DataQueueOp");
DATA_INFO data_info;
RETURN_IF_NOT_OK(op->GetDataInfo(&data_info));
for (auto el : data_info) {
@ -338,8 +338,8 @@ Status ToDevice::Terminate() {
#ifdef ENABLE_TDTQUE
std::shared_ptr<DatasetOp> root = std::shared_ptr<DatasetOp>(tree_adapter_->GetRoot());
CHECK_FAIL_RETURN_UNEXPECTED(root != nullptr, "Root is a nullptr.");
DeviceQueueOp *op = dynamic_cast<DeviceQueueOp *>(root.get());
CHECK_FAIL_RETURN_UNEXPECTED(op != nullptr, "StopSend only supported by DeviceQueueOp");
DataQueueOp *op = dynamic_cast<DataQueueOp *>(root.get());
CHECK_FAIL_RETURN_UNEXPECTED(op != nullptr, "StopSend only supported by DataQueueOp");
op->StopWaiting();
#endif
return TreeConsumer::Terminate();
@ -358,12 +358,12 @@ Status TreeConsumer::Reset(int64_t step) {
RETURN_IF_NOT_OK(this->Terminate());
}
#ifdef WITH_BACKEND
MS_EXCEPTION_IF_NULL(MsContext::GetInstance());
RETURN_UNEXPECTED_IF_NULL(MsContext::GetInstance());
if (MsContext::GetInstance()->get_param<std::string>(MS_CTX_DEVICE_TARGET) == kGPUDevice) {
// clear the device if GPU is used.
std::shared_ptr<DatasetOp> root = std::shared_ptr<DatasetOp>(tree_adapter_->GetRoot());
CHECK_FAIL_RETURN_UNEXPECTED(root != nullptr, "Root is a nullptr.");
DeviceQueueOp *op = dynamic_cast<DeviceQueueOp *>(root.get());
DataQueueOp *op = dynamic_cast<DataQueueOp *>(root.get());
if (op != nullptr) {
MS_LOG(INFO) << "Clearing the GPU device";
RETURN_IF_NOT_OK(op->ClearDevice());

View File

@ -8,7 +8,7 @@ set(DATASET_ENGINE_DATASETOPS_SRC_FILES
dataset_op.cc
pipeline_op.cc
batch_op.cc
device_queue_op.cc
data_queue_op.cc
project_op.cc
rename_op.cc
repeat_op.cc

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
#include "minddata/dataset/engine/datasetops/device_queue_op.h"
#include "minddata/dataset/engine/datasetops/data_queue_op.h"
#include <algorithm>
#include <iostream>
@ -41,17 +41,17 @@ std::vector<DataQueueItem> ConvertTensorRowToDataQueueItem(const TensorRow &row)
std::vector<device::DataQueueItem> items;
for (auto &i : row) {
device::DataQueueItem data_item;
data_item.data_len_ = static_cast<size_t>(i->SizeInBytes());
data_item.shapes_ = i->shape().AsVector();
data_item.data_ptr_ = const_cast<void *>(static_cast<const void *>(i->GetBuffer()));
data_item.data_type_ = i->type().ToString();
data_item.data_len = static_cast<size_t>(i->SizeInBytes());
data_item.shapes = i->shape().AsVector();
data_item.data_ptr = const_cast<void *>(static_cast<const void *>(i->GetBuffer()));
data_item.data_type = i->type().ToString();
items.emplace_back(std::move(data_item));
}
return items;
}
} // namespace
DeviceQueueOp::DeviceQueueOp(const std::string channel_name, DeviceType device_type, int32_t device_id,
bool send_epoch_end, int32_t total_batch, bool create_data_info_queue)
DataQueueOp::DataQueueOp(const std::string channel_name, DeviceType device_type, int32_t device_id, bool send_epoch_end,
int32_t total_batch, bool create_data_info_queue)
: PipelineOp(1),
channel_name_(channel_name),
device_type_(device_type),
@ -87,7 +87,7 @@ DeviceQueueOp::DeviceQueueOp(const std::string channel_name, DeviceType device_t
#endif
}
DeviceQueueOp::~DeviceQueueOp() {
DataQueueOp::~DataQueueOp() {
#ifdef ENABLE_DUMP_IR
std::string rdr_msg = md_channel_info_->ToString();
if (!send_finished_ && !rdr_msg.empty()) {
@ -96,18 +96,18 @@ DeviceQueueOp::~DeviceQueueOp() {
#endif
}
void DeviceQueueOp::ReleaseData(void *addr, int32_t worker_id) {
void DataQueueOp::ReleaseData(void *addr, int32_t worker_id) {
if (addr != nullptr && worker_id >= 0 && worker_id < pool_.size()) {
pool_[worker_id]->Deallocate(addr);
}
}
Status DeviceQueueOp::EoeReceived(int32_t worker_id) {
Status DataQueueOp::EoeReceived(int32_t worker_id) {
state_ = OpState::kDeOpIdle;
return Status::OK();
}
Status DeviceQueueOp::FilterMetadata(TensorRow *row) {
Status DataQueueOp::FilterMetadata(TensorRow *row) {
std::unordered_map<std::string, int32_t> current_name_id_map = child_[0]->column_name_id_map();
TensorRow output;
TensorRow tmp = *row;
@ -134,7 +134,7 @@ Status DeviceQueueOp::FilterMetadata(TensorRow *row) {
return Status::OK();
}
Status DeviceQueueOp::CheckExceptions(const TensorRow &row) const {
Status DataQueueOp::CheckExceptions(const TensorRow &row) const {
// this method checks if the row meets the conditions to be sent to TDT
for (const auto &item : row) {
CHECK_FAIL_RETURN_UNEXPECTED(item->type().IsNumeric(), "Invalid datatype, cannot send string data to device.");
@ -143,10 +143,10 @@ Status DeviceQueueOp::CheckExceptions(const TensorRow &row) const {
return Status::OK();
}
Status DeviceQueueOp::operator()() {
Status DataQueueOp::operator()() {
#ifndef ENABLE_SECURITY
RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(
"Detect first batch", std::bind(&DeviceQueueOp::DetectFirstBatch, this), nullptr, id()));
RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Detect first batch",
std::bind(&DataQueueOp::DetectFirstBatch, this), nullptr, id()));
#endif
TaskManager::FindMe()->Post();
child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
@ -187,7 +187,7 @@ Status DeviceQueueOp::operator()() {
return Status::OK();
}
Status DeviceQueueOp::SendDataToAscend() {
Status DataQueueOp::SendDataToAscend() {
MS_LOG(INFO) << "Device queue, sending data to Ascend.";
#ifndef ENABLE_SECURITY
uint64_t batch_start_time = 0;
@ -304,8 +304,8 @@ Status DeviceQueueOp::SendDataToAscend() {
return Status::OK();
}
Status DeviceQueueOp::SendEpochEndToAscend(const TensorRow &curr_row, const bool &is_profiling_enable,
int32_t *tdt_cost, bool *is_break_loop) {
Status DataQueueOp::SendEpochEndToAscend(const TensorRow &curr_row, const bool &is_profiling_enable, int32_t *tdt_cost,
bool *is_break_loop) {
RETURN_UNEXPECTED_IF_NULL(tdt_cost);
RETURN_UNEXPECTED_IF_NULL(is_break_loop);
if (curr_row.eoe() && send_epoch_end_ && ascend_data_queue_->IsOpen()) {
@ -320,7 +320,7 @@ Status DeviceQueueOp::SendEpochEndToAscend(const TensorRow &curr_row, const bool
#ifndef ENABLE_SECURITY
if (is_profiling_enable) {
double end_time = ProfilingTime::GetCurMilliSecond();
// compile error occurring when MS_EXCEPTION_IF_NULL
RETURN_UNEXPECTED_IF_NULL(tdt_cost);
*tdt_cost = static_cast<int32_t>(end_time - start_time);
}
#endif
@ -332,14 +332,14 @@ Status DeviceQueueOp::SendEpochEndToAscend(const TensorRow &curr_row, const bool
return Status::OK();
}
void DeviceQueueOp::WaitContinueSignal() const {
void DataQueueOp::WaitContinueSignal() const {
while (stop_send_ && ascend_keep_waiting_) {
MS_LOG(DEBUG) << "stop_send flag is set, waiting for continue signal...";
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}
void DeviceQueueOp::LimitSendingBatches(int64_t send_batch, int64_t *sending_num, std::shared_ptr<ConfigManager> cfg) {
void DataQueueOp::LimitSendingBatches(int64_t send_batch, int64_t *sending_num, std::shared_ptr<ConfigManager> cfg) {
while (send_batch >= *sending_num) {
*sending_num = cfg->sending_batches();
if (*sending_num == 0) {
@ -351,7 +351,7 @@ void DeviceQueueOp::LimitSendingBatches(int64_t send_batch, int64_t *sending_num
}
}
Status DeviceQueueOp::SendRowToTdt(TensorRow curr_row, bool is_profiling_enable, int32_t *tdt_cost) {
Status DataQueueOp::SendRowToTdt(TensorRow curr_row, bool is_profiling_enable, int32_t *tdt_cost) {
std::vector<device::DataQueueItem> items = ConvertTensorRowToDataQueueItem(curr_row);
#ifndef ENABLE_SECURITY
double start_time = 0;
@ -363,7 +363,7 @@ Status DeviceQueueOp::SendRowToTdt(TensorRow curr_row, bool is_profiling_enable,
#ifndef ENABLE_SECURITY
if (is_profiling_enable) {
double end_time = ProfilingTime::GetCurMilliSecond();
// compile error occurring when MS_EXCEPTION_IF_NULL
RETURN_UNEXPECTED_IF_NULL(tdt_cost);
*tdt_cost = static_cast<int32_t>(end_time - start_time);
}
#endif
@ -387,8 +387,7 @@ Status DeviceQueueOp::SendRowToTdt(TensorRow curr_row, bool is_profiling_enable,
return Status::OK();
}
Status DeviceQueueOp::CheckPushStatus(DataQueueStatus status, bool stop_send, bool *send_finished,
bool *is_break_loop) {
Status DataQueueOp::CheckPushStatus(DataQueueStatus status, bool stop_send, bool *send_finished, bool *is_break_loop) {
if (status != DataQueueStatus::SUCCESS) {
if (stop_send) {
*send_finished = true;
@ -410,9 +409,9 @@ Status DeviceQueueOp::CheckPushStatus(DataQueueStatus status, bool stop_send, bo
return Status::OK();
}
Status DeviceQueueOp::GetDataInfo(DATA_INFO *data_info) {
Status DataQueueOp::GetDataInfo(DATA_INFO *data_info) {
#ifdef WITH_BACKEND
MS_EXCEPTION_IF_NULL(MsContext::GetInstance());
RETURN_UNEXPECTED_IF_NULL(MsContext::GetInstance());
if (MsContext::GetInstance()->get_param<std::string>(MS_CTX_DEVICE_TARGET) != kAscendDevice) {
RETURN_STATUS_UNEXPECTED("'GetDataInfo' only supported on Ascend.");
}
@ -434,14 +433,14 @@ Status DeviceQueueOp::GetDataInfo(DATA_INFO *data_info) {
return Status::OK();
}
Status DeviceQueueOp::SetThreadDevice() {
Status DataQueueOp::SetThreadDevice() {
#ifdef WITH_BACKEND
(void)device::DataQueueMgr::GetInstance().SetThreadDevice("GPU");
(void)device::DataQueueMgr::GetInstance().SetThreadDevice(channel_name_);
#endif
return Status::OK();
}
Status DeviceQueueOp::LaunchParallelCopyThread() {
Status DataQueueOp::LaunchParallelCopyThread() {
#ifdef WITH_BACKEND
RETURN_UNEXPECTED_IF_NULL(tree_);
// Every thread use cuda api should SetThreadDevice
@ -449,7 +448,7 @@ Status DeviceQueueOp::LaunchParallelCopyThread() {
// CircularPool may not safe under multi-threads scenario, so one worker with one pool
for (int i = 0; i < num_workers_; i++) {
std::shared_ptr<MemoryPool> pool;
MS_EXCEPTION_IF_NULL(MsContext::GetInstance());
RETURN_UNEXPECTED_IF_NULL(MsContext::GetInstance());
RETURN_IF_NOT_OK(CircularPool::CreateCircularPool(
&pool, -1, kDeviceQueGpuThreadMemory, false,
MsContext::GetInstance()->get_param<std::string>(MS_CTX_DEVICE_TARGET) == kGPUDevice));
@ -459,14 +458,14 @@ Status DeviceQueueOp::LaunchParallelCopyThread() {
receive_queues_.Init(num_workers_, queue_capacity_);
RETURN_IF_NOT_OK(receive_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&DeviceQueueOp::WorkerEntry, this, std::placeholders::_1), "", id()));
tree_->LaunchWorkers(num_workers_, std::bind(&DataQueueOp::WorkerEntry, this, std::placeholders::_1), "", id()));
RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Push data to GPU queue",
std::bind(&DeviceQueueOp::PushDataToGPU, this), nullptr, id()));
std::bind(&DataQueueOp::PushDataToGPU, this), nullptr, id()));
#endif
return Status::OK();
}
bool DeviceQueueOp::NoExceptionRaised() {
bool DataQueueOp::NoExceptionRaised() {
#ifdef WITH_BACKEND
return !TaskManager::FindMe()->Interrupted() && !device::DataQueueMgr::GetInstance().IsClosed();
#else
@ -474,7 +473,7 @@ bool DeviceQueueOp::NoExceptionRaised() {
#endif
}
Status DeviceQueueOp::PushDataToGPU() {
Status DataQueueOp::PushDataToGPU() {
#ifdef WITH_BACKEND
RETURN_UNEXPECTED_IF_NULL(tree_);
// Every thread use cuda api should SetThreadDevice
@ -502,7 +501,7 @@ Status DeviceQueueOp::PushDataToGPU() {
auto items = std::move(item.data_item);
bool eoe_flag = item.eoe_flag;
int64_t send_batch = 0;
auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2);
auto release_function = std::bind(&DataQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2);
DataQueueStatus ret;
if (dynamic_shape_) {
ret = device::DataQueueMgr::GetInstance().OpenDynamicBufQueue(channel_name_, release_function);
@ -522,8 +521,8 @@ Status DeviceQueueOp::PushDataToGPU() {
#endif
// Data prefetch only when PS mode enables cache.
#ifndef _WIN32
if (!ps::PsDataPrefetch::GetInstance().PrefetchData(channel_name_, items[0].data_ptr_, items[0].data_len_,
items[0].data_type_)) {
if (!ps::PsDataPrefetch::GetInstance().PrefetchData(channel_name_, items[0].data_ptr, items[0].data_len,
items[0].data_type)) {
RETURN_STATUS_ERROR(StatusCode::kMDTimeOut,
"[Internal ERROR] Failed to prefetch data in current PS mode(cache data when sending).");
}
@ -577,8 +576,8 @@ Status DeviceQueueOp::PushDataToGPU() {
return Status::OK();
}
// WorkEntry of DeviceQueueOp just do multi_threads memcpy for performance optimization.
Status DeviceQueueOp::WorkerEntry(int32_t worker_id) {
// WorkEntry of DataQueueOp just do multi_threads memcpy for performance optimization.
Status DataQueueOp::WorkerEntry(int32_t worker_id) {
// Every thread use cuda api should SetThreadDevice
RETURN_IF_NOT_OK(SetThreadDevice());
TaskManager::FindMe()->Post();
@ -591,10 +590,10 @@ Status DeviceQueueOp::WorkerEntry(int32_t worker_id) {
std::vector<device::DataQueueItem> items;
for (auto &i : current_row) {
device::DataQueueItem data_item;
data_item.data_len_ = static_cast<size_t>(i->SizeInBytes());
data_item.shapes_ = i->shape().AsVector();
data_item.data_ptr_ = nullptr;
data_item.worker_id_ = worker_id;
data_item.data_len = static_cast<size_t>(i->SizeInBytes());
data_item.shapes = i->shape().AsVector();
data_item.data_ptr = nullptr;
data_item.worker_id = worker_id;
items.push_back(data_item);
}
@ -616,7 +615,7 @@ Status DeviceQueueOp::WorkerEntry(int32_t worker_id) {
return Status::OK();
}
Status DeviceQueueOp::SendDataToGPU() {
Status DataQueueOp::SendDataToGPU() {
#ifdef WITH_BACKEND
RETURN_IF_NOT_OK(LaunchParallelCopyThread());
MS_LOG(INFO) << "Device queue, sending data to GPU.";
@ -669,25 +668,27 @@ Status DeviceQueueOp::SendDataToGPU() {
}
MS_LOG(INFO) << "Device queue received number of batches and EOEs: " << (num_buf - num_workers_);
#else
MS_LOG(WARNING) << "Gpu queue is not supported in ut tests.";
#endif
return Status::OK();
}
Status DeviceQueueOp::MallocForGPUData(std::vector<device::DataQueueItem> *items, const TensorRow &curr_row,
const int32_t &worker_id) {
Status DataQueueOp::MallocForGPUData(std::vector<device::DataQueueItem> *items, const TensorRow &curr_row,
const int32_t &worker_id) {
int i = 0;
for (auto &sub_item : *items) {
auto rc = pool_[worker_id]->Allocate(sub_item.data_len_, &sub_item.data_ptr_);
if (rc.IsError() || sub_item.data_ptr_ == nullptr) {
auto rc = pool_[worker_id]->Allocate(sub_item.data_len, &sub_item.data_ptr);
if (rc.IsError() || sub_item.data_ptr == nullptr) {
RETURN_STATUS_OOM("Memory malloc failed, check memory usage.");
}
if (curr_row[i] == nullptr) {
MS_LOG(ERROR) << "[Internal ERROR] The pointer curr_row[" << i << "] is null";
RETURN_STATUS_UNEXPECTED("[Internal ERROR] TensorRow 'curr_row' contains nullptr.");
}
sub_item.data_type_ = curr_row[i]->type().ToString();
sub_item.data_type = curr_row[i]->type().ToString();
const unsigned char *column_data = curr_row[i]->GetBuffer();
if (memcpy_s(sub_item.data_ptr_, sub_item.data_len_, column_data,
if (memcpy_s(sub_item.data_ptr, sub_item.data_len, column_data,
static_cast<uint32_t>(curr_row[i++]->SizeInBytes())) != 0) {
MS_LOG(ERROR) << "[Internal ERROR] memcpy_s failed.";
RETURN_STATUS_UNEXPECTED("[Internal ERROR] memcpy_s failed.");
@ -697,10 +698,10 @@ Status DeviceQueueOp::MallocForGPUData(std::vector<device::DataQueueItem> *items
return Status::OK();
}
Status DeviceQueueOp::ClearDevice() {
Status DataQueueOp::ClearDevice() {
#ifdef WITH_BACKEND
MS_LOG(INFO) << "Clearing the data in GPU device: " << device_id_ << " channel: " << channel_name_;
auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2);
auto release_function = std::bind(&DataQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2);
auto ret = device::DataQueueMgr::GetInstance().Open(channel_name_, release_function);
if (ret != DataQueueStatus::SUCCESS) {
RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to open channel for clearing the device.");
@ -715,7 +716,7 @@ Status DeviceQueueOp::ClearDevice() {
return Status::OK();
}
Status DeviceQueueOp::SendDataToCPU() {
Status DataQueueOp::SendDataToCPU() {
MS_LOG(INFO) << "Device queue, sending data to CPU.";
int64_t total_batch = 0;
@ -742,7 +743,7 @@ Status DeviceQueueOp::SendDataToCPU() {
return Status::OK();
}
void DeviceQueueOp::Print(std::ostream &out, bool show_all) const {
void DataQueueOp::Print(std::ostream &out, bool show_all) const {
if (!show_all) {
// Call the super class for displaying any common 1-liner info
PipelineOp::Print(out, show_all);
@ -757,9 +758,9 @@ void DeviceQueueOp::Print(std::ostream &out, bool show_all) const {
}
#ifndef ENABLE_SECURITY
void DeviceQueueOp::ProfilingRecorder(bool is_profiling_enable, std::shared_ptr<DeviceQueueTracing> profiling_node,
int64_t send_batch, int32_t tdt_cost, uint64_t *batch_start_time,
uint64_t *end_time, int32_t connector_capacity, int32_t connector_size) {
void DataQueueOp::ProfilingRecorder(bool is_profiling_enable, std::shared_ptr<DeviceQueueTracing> profiling_node,
int64_t send_batch, int32_t tdt_cost, uint64_t *batch_start_time,
uint64_t *end_time, int32_t connector_capacity, int32_t connector_size) {
// Record the pipeline profiling info
if (is_profiling_enable) {
*end_time = ProfilingTime::GetCurMilliSecond();
@ -776,7 +777,7 @@ void DeviceQueueOp::ProfilingRecorder(bool is_profiling_enable, std::shared_ptr<
}
}
Status DeviceQueueOp::DetectFirstBatch() {
Status DataQueueOp::DetectFirstBatch() {
TaskManager::FindMe()->Post();
uint8_t count_num = 0;
uint64_t temp_start_time = ProfilingTime::GetCurMilliSecond();
@ -800,7 +801,7 @@ Status DeviceQueueOp::DetectFirstBatch() {
return Status::OK();
}
void DeviceQueueOp::DetectPerBatchTime(const uint64_t *start_time, uint64_t *end_time) {
void DataQueueOp::DetectPerBatchTime(const uint64_t *start_time, uint64_t *end_time) {
*end_time = ProfilingTime::GetCurMilliSecond();
if (*end_time - *start_time > kTimeOutMilliSeconds) {
MS_LOG(WARNING) << "Bad performance attention, it takes more than 25 seconds to fetch a batch of data from dataset "
@ -810,13 +811,13 @@ void DeviceQueueOp::DetectPerBatchTime(const uint64_t *start_time, uint64_t *end
}
#endif
void DeviceQueueOp::PrintBeginInfoWhenFirstBatch(const bool &first_push_flag) {
void DataQueueOp::PrintBeginInfoWhenFirstBatch(const bool &first_push_flag) {
if (first_push_flag != true) {
MS_LOG(INFO) << "Loading dataset and begin to push first batch into device ...";
}
}
void DeviceQueueOp::PrintEndInfoWhenFirstBatch(bool *first_push_flag) {
void DataQueueOp::PrintEndInfoWhenFirstBatch(bool *first_push_flag) {
if (!first_push_flag) {
MS_LOG(WARNING) << "First batch flag: first_push_flag is nullptr";
return;
@ -826,8 +827,7 @@ void DeviceQueueOp::PrintEndInfoWhenFirstBatch(bool *first_push_flag) {
*first_push_flag = true;
}
}
Status DeviceQueueOp::RetryPushData(const std::vector<DataQueueItem> &items, const bool profiling,
uint64_t *push_time) {
Status DataQueueOp::RetryPushData(const std::vector<DataQueueItem> &items, const bool profiling, uint64_t *push_time) {
#ifdef WITH_BACKEND
bool flag_log = false;
#ifndef ENABLE_SECURITY
@ -867,7 +867,7 @@ Status DeviceQueueOp::RetryPushData(const std::vector<DataQueueItem> &items, con
return Status::OK();
}
Status DeviceQueueOp::SendDataToAscendDynamic() {
Status DataQueueOp::SendDataToAscendDynamic() {
#ifdef WITH_BACKEND
MS_LOG(DEBUG) << "Dynamic Device queue, sending data to Ascend.";

View File

@ -13,8 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DEVICE_QUEUE_OP_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DEVICE_QUEUE_OP_H_
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DATA_QUEUE_OP_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DATA_QUEUE_OP_H_
#include <memory>
#include <string>
@ -44,7 +44,7 @@ using mindspore::device::DataQueueStatus;
constexpr int32_t kTimeOutMilliSeconds = 25000;
const int kDataInfoQueueCapacity = 128;
class DeviceQueueOp : public PipelineOp {
class DataQueueOp : public PipelineOp {
public:
static const uint32_t INVALID_HANDLE = 0xffffffffUL;
const uint32_t WAIT_TIME = 5;
@ -53,12 +53,12 @@ class DeviceQueueOp : public PipelineOp {
// Name: constructor
// Description
DeviceQueueOp(const std::string channel_name, DeviceType device_type, int32_t device_id, bool send_epoch_end,
int32_t total_batch, bool create_data_info_queue);
DataQueueOp(const std::string channel_name, DeviceType device_type, int32_t device_id, bool send_epoch_end,
int32_t total_batch, bool create_data_info_queue);
// Name: destructor
// Description
~DeviceQueueOp();
~DataQueueOp();
/// \brief Getter function
/// \return connector size of current op
@ -85,7 +85,7 @@ class DeviceQueueOp : public PipelineOp {
bool show_all) const override; // In: T/F if it should print everything
// Provide stream operator for displaying it
friend std::ostream &operator<<(std::ostream &out, const DeviceQueueOp &to) {
friend std::ostream &operator<<(std::ostream &out, const DataQueueOp &to) {
to.Print(out, false);
return out;
}
@ -108,7 +108,7 @@ class DeviceQueueOp : public PipelineOp {
Status FilterMetadata(TensorRow *row);
// Name: CheckExceptions(TensorRow);
// Description: Check whether the TensorRow meets the condition for performing DeviceQueueOp
// Description: Check whether the TensorRow meets the condition for performing DataQueueOp
Status CheckExceptions(const TensorRow &row) const;
// Name: PrintBeginInfoWhenFirstBatch(bool)
@ -181,4 +181,4 @@ class DeviceQueueOp : public PipelineOp {
};
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DEVICE_QUEUE_OP_H_
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DATA_QUEUE_OP_H_

View File

@ -23,7 +23,7 @@
#include <string>
#include <algorithm>
#include "minddata/dataset/engine/datasetops/device_queue_op.h"
#include "minddata/dataset/engine/datasetops/data_queue_op.h"
#include "minddata/dataset/engine/datasetops/source/sampler/sampler.h"
#include "minddata/dataset/engine/operator_connector.h"
@ -54,7 +54,7 @@ DatasetOp::DatasetOp(int32_t op_connector_size, std::shared_ptr<SamplerRT> sampl
// Adds a operator to become our child.
Status DatasetOp::AddChild(std::shared_ptr<DatasetOp> child) {
if (std::dynamic_pointer_cast<DeviceQueueOp>(child) != nullptr) {
if (std::dynamic_pointer_cast<DataQueueOp>(child) != nullptr) {
std::string err_msg(
"Unsupported scenario, \'send\' operator can only be after \'device_queue\' operation, but got " + Name());
RETURN_STATUS_UNEXPECTED(err_msg);

View File

@ -42,7 +42,7 @@ constexpr char kCacheMergeOp[] = "CacheMergeOp";
constexpr char kCacheOp[] = "CacheOp";
constexpr char kConcatOp[] = "ConcatOp";
constexpr char kDatasetOp[] = "DatasetOp";
constexpr char kDeviceQueueOp[] = "DeviceQueueOp";
constexpr char kDeviceQueueOp[] = "DataQueueOp";
constexpr char kEpochCtrlOp[] = "EpochCtrlOp";
constexpr char kFilterOp[] = "FilterOp";
constexpr char kMapOp[] = "MapOp";

View File

@ -18,7 +18,7 @@
#include <string>
#include <limits>
#include "minddata/dataset/engine/datasetops/dataset_op.h"
#include "minddata/dataset/engine/datasetops/device_queue_op.h"
#include "minddata/dataset/engine/datasetops/data_queue_op.h"
#ifdef WITH_BACKEND
#include "mindspore/core/utils/numa_interface.h"
#endif
@ -57,7 +57,7 @@ ExecutionTree::~ExecutionTree() {
handle_ = nullptr;
}
#if defined(ENABLE_TDTQUE)
DeviceQueueOp *op = dynamic_cast<DeviceQueueOp *>(root_.get());
DataQueueOp *op = dynamic_cast<DataQueueOp *>(root_.get());
if (op != nullptr) {
op->StopWaiting();
}

View File

@ -23,7 +23,7 @@ set(DATASET_ENGINE_IR_DATASETOPS_SRC_FILES
skip_node.cc
sync_wait_node.cc
take_node.cc
transfer_node.cc
data_queue_node.cc
zip_node.cc
)

View File

@ -1,5 +1,5 @@
/**
* Copyright 2020-2021 Huawei Technologies Co., Ltd
* Copyright 2020-2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,14 +14,14 @@
* limitations under the License.
*/
#include "minddata/dataset/engine/ir/datasetops/transfer_node.h"
#include "minddata/dataset/engine/ir/datasetops/data_queue_node.h"
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "minddata/dataset/engine/datasetops/device_queue_op.h"
#include "minddata/dataset/engine/datasetops/data_queue_op.h"
#include "minddata/dataset/engine/opt/pass.h"
#include "minddata/dataset/util/status.h"
@ -29,10 +29,9 @@
namespace mindspore {
namespace dataset {
// Constructor for TransferNode
TransferNode::TransferNode(std::shared_ptr<DatasetNode> child, std::string queue_name, std::string device_type,
int32_t device_id, bool send_epoch_end, int32_t total_batch, bool create_data_info_queue)
// Constructor for DataQueueNode
DataQueueNode::DataQueueNode(std::shared_ptr<DatasetNode> child, std::string queue_name, std::string device_type,
int32_t device_id, bool send_epoch_end, int32_t total_batch, bool create_data_info_queue)
: queue_name_(std::move(queue_name)),
device_id_(device_id),
device_type_(std::move(device_type)),
@ -42,26 +41,26 @@ TransferNode::TransferNode(std::shared_ptr<DatasetNode> child, std::string queue
this->AddChild(child);
}
std::shared_ptr<DatasetNode> TransferNode::Copy() {
auto node = std::make_shared<TransferNode>(nullptr, queue_name_, device_type_, device_id_, send_epoch_end_,
total_batch_, create_data_info_queue_);
std::shared_ptr<DatasetNode> DataQueueNode::Copy() {
auto node = std::make_shared<DataQueueNode>(nullptr, queue_name_, device_type_, device_id_, send_epoch_end_,
total_batch_, create_data_info_queue_);
return node;
}
void TransferNode::Print(std::ostream &out) const {
void DataQueueNode::Print(std::ostream &out) const {
out << (Name() + ",send_epoch_end:" + (send_epoch_end_ ? "true" : "false") +
",total_batch:" + std::to_string(total_batch_) + ")");
}
// Validator for TransferNode
Status TransferNode::ValidateParams() {
// Validator for DataQueueNode
Status DataQueueNode::ValidateParams() {
RETURN_IF_NOT_OK(DatasetNode::ValidateParams());
RETURN_IF_NOT_OK(ValidateScalar("Transfer", "Total batches", total_batch_, {0}, false));
return Status::OK();
}
// Function to build TransferNode
Status TransferNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_ops) {
// Function to build DataQueueNode
Status DataQueueNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_ops) {
if (queue_name_.empty()) {
// Get a uuid for queue name
queue_name_ = Services::GetUniqueID();
@ -77,21 +76,21 @@ Status TransferNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_o
// Get device type from ms context
// Convert device_type_ from string to DeviceType
DeviceQueueOp::DeviceType type;
DataQueueOp::DeviceType type;
if (device_type_ == kCPUDevice) {
type = DeviceQueueOp::DeviceType::CPU;
type = DataQueueOp::DeviceType::CPU;
} else if (device_type_ == kGPUDevice) {
type = DeviceQueueOp::DeviceType::GPU;
type = DataQueueOp::DeviceType::GPU;
} else if (device_type_ == kAscendDevice) {
type = DeviceQueueOp::DeviceType::Ascend;
type = DataQueueOp::DeviceType::Ascend;
} else {
std::string err_msg = "Unknown device target, support CPU, GPU or Ascend";
MS_LOG(ERROR) << err_msg;
RETURN_STATUS_UNEXPECTED(err_msg);
}
auto op = std::make_shared<DeviceQueueOp>(queue_name_, type, device_id_, send_epoch_end_, total_batch_,
create_data_info_queue_);
auto op = std::make_shared<DataQueueOp>(queue_name_, type, device_id_, send_epoch_end_, total_batch_,
create_data_info_queue_);
op->SetTotalRepeats(GetTotalRepeats());
op->SetNumRepeatsPerEpoch(GetNumRepeatsPerEpoch());
node_ops->push_back(op);
@ -99,18 +98,18 @@ Status TransferNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_o
}
// Visitor accepting method for IRNodePass
Status TransferNode::Accept(IRNodePass *const p, bool *const modified) {
Status DataQueueNode::Accept(IRNodePass *const p, bool *const modified) {
// Downcast shared pointer then call visitor
return p->Visit(shared_from_base<TransferNode>(), modified);
return p->Visit(shared_from_base<DataQueueNode>(), modified);
}
// Visitor accepting method for IRNodePass
Status TransferNode::AcceptAfter(IRNodePass *const p, bool *const modified) {
Status DataQueueNode::AcceptAfter(IRNodePass *const p, bool *const modified) {
// Downcast shared pointer then call visitor
return p->VisitAfter(shared_from_base<TransferNode>(), modified);
return p->VisitAfter(shared_from_base<DataQueueNode>(), modified);
}
Status TransferNode::to_json(nlohmann::json *out_json) {
Status DataQueueNode::to_json(nlohmann::json *out_json) {
nlohmann::json args;
args["queue_name"] = queue_name_;
args["device_type"] = device_type_;
@ -122,8 +121,8 @@ Status TransferNode::to_json(nlohmann::json *out_json) {
return Status::OK();
}
Status TransferNode::from_json(nlohmann::json json_obj, std::shared_ptr<DatasetNode> ds,
std::shared_ptr<DatasetNode> *result) {
Status DataQueueNode::from_json(nlohmann::json json_obj, std::shared_ptr<DatasetNode> ds,
std::shared_ptr<DatasetNode> *result) {
RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "queue_name", kTransferNode));
RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "device_type", kTransferNode));
RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "device_id", kTransferNode));
@ -136,8 +135,8 @@ Status TransferNode::from_json(nlohmann::json json_obj, std::shared_ptr<DatasetN
bool send_epoch_end = json_obj["send_epoch_end"];
int32_t total_batch = json_obj["total_batch"];
bool create_data_info_queue = json_obj["create_data_info_queue"];
*result = std::make_shared<TransferNode>(ds, queue_name, device_type, device_id, send_epoch_end, total_batch,
create_data_info_queue);
*result = std::make_shared<DataQueueNode>(ds, queue_name, device_type, device_id, send_epoch_end, total_batch,
create_data_info_queue);
return Status::OK();
}
} // namespace dataset

View File

@ -1,5 +1,5 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
* Copyright 2020-2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,8 +14,8 @@
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_IR_DATASETOPS_SOURCE_TRANSFER_NODE_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_IR_DATASETOPS_SOURCE_TRANSFER_NODE_H_
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_IR_DATASETOPS_SOURCE_DATA_QUEUE_NODE_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_IR_DATASETOPS_SOURCE_DATA_QUEUE_NODE_H_
#include <memory>
#include <string>
@ -25,14 +25,14 @@
namespace mindspore {
namespace dataset {
class TransferNode : public DatasetNode {
class DataQueueNode : public DatasetNode {
public:
/// \brief Constructor
TransferNode(std::shared_ptr<DatasetNode> child, std::string queue_name, std::string device_type, int32_t device_id,
bool send_epoch_end, int32_t total_batch, bool create_data_info_queue);
DataQueueNode(std::shared_ptr<DatasetNode> child, std::string queue_name, std::string device_type, int32_t device_id,
bool send_epoch_end, int32_t total_batch, bool create_data_info_queue);
/// \brief Destructor
~TransferNode() override = default;
~DataQueueNode() override = default;
/// \brief Node name getter
/// \return Name of the current node
@ -100,4 +100,4 @@ class TransferNode : public DatasetNode {
};
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_IR_DATASETOPS_SOURCE_TRANSFER_NODE_H_
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_IR_DATASETOPS_SOURCE_DATA_QUEUE_NODE_H_

View File

@ -48,7 +48,7 @@
#include "minddata/dataset/engine/ir/datasetops/sync_wait_node.h"
#endif
#include "minddata/dataset/engine/ir/datasetops/take_node.h"
#include "minddata/dataset/engine/ir/datasetops/transfer_node.h"
#include "minddata/dataset/engine/ir/datasetops/data_queue_node.h"
#include "minddata/dataset/engine/ir/datasetops/zip_node.h"
namespace mindspore {
@ -255,10 +255,10 @@ Status IRNodePass::Visit(std::shared_ptr<TFRecordNode> node, bool *const modifie
Status IRNodePass::VisitAfter(std::shared_ptr<TFRecordNode> node, bool *const modified) {
return VisitAfter(std::static_pointer_cast<NonMappableSourceNode>(node), modified);
}
Status IRNodePass::Visit(std::shared_ptr<TransferNode> node, bool *const modified) {
Status IRNodePass::Visit(std::shared_ptr<DataQueueNode> node, bool *const modified) {
return Visit(std::static_pointer_cast<DatasetNode>(node), modified);
}
Status IRNodePass::VisitAfter(std::shared_ptr<TransferNode> node, bool *const modified) {
Status IRNodePass::VisitAfter(std::shared_ptr<DataQueueNode> node, bool *const modified) {
return VisitAfter(std::static_pointer_cast<DatasetNode>(node), modified);
}
Status IRNodePass::Visit(std::shared_ptr<ZipNode> node, bool *const modified) {

View File

@ -47,7 +47,7 @@ class ShuffleNode;
class SkipNode;
class TakeNode;
class TFRecordNode;
class TransferNode;
class DataQueueNode;
class ZipNode;
#ifdef ENABLE_PYTHON
class SyncWaitNode;
@ -212,8 +212,8 @@ class IRNodePass : public IRPass {
virtual Status VisitAfter(std::shared_ptr<TakeNode> node, bool *const modified);
virtual Status Visit(std::shared_ptr<TFRecordNode> node, bool *const modified);
virtual Status VisitAfter(std::shared_ptr<TFRecordNode> node, bool *const modified);
virtual Status Visit(std::shared_ptr<TransferNode> node, bool *const modified);
virtual Status VisitAfter(std::shared_ptr<TransferNode> node, bool *const modified);
virtual Status Visit(std::shared_ptr<DataQueueNode> node, bool *const modified);
virtual Status VisitAfter(std::shared_ptr<DataQueueNode> node, bool *const modified);
virtual Status Visit(std::shared_ptr<ZipNode> node, bool *const modified);
virtual Status VisitAfter(std::shared_ptr<ZipNode> node, bool *const modified);

View File

@ -23,7 +23,7 @@
#include "minddata/dataset/engine/ir/datasetops/cache_node.h"
#include "minddata/dataset/engine/ir/datasetops/epoch_ctrl_node.h"
#include "minddata/dataset/engine/ir/datasetops/repeat_node.h"
#include "minddata/dataset/engine/ir/datasetops/transfer_node.h"
#include "minddata/dataset/engine/ir/datasetops/data_queue_node.h"
namespace mindspore {
namespace dataset {
@ -206,10 +206,10 @@ Status RepeatPass::VisitAfter(std::shared_ptr<CacheLookupNode> node, bool *const
}
#endif
Status RepeatPass::VisitAfter(std::shared_ptr<TransferNode> node, bool *const modified) {
Status RepeatPass::VisitAfter(std::shared_ptr<DataQueueNode> node, bool *const modified) {
RETURN_UNEXPECTED_IF_NULL(node);
RETURN_UNEXPECTED_IF_NULL(modified);
// Set total repeats and total epochs for the TransferNode
// Set total repeats and total epochs for the DataQueueNode
node->SetTotalRepeats(num_epochs_);
node->SetNumEpochs(num_epochs_);
return Status::OK();

View File

@ -95,11 +95,11 @@ class RepeatPass : public IRNodePass {
Status VisitAfter(std::shared_ptr<CacheLookupNode> node, bool *const modified) override;
#endif
/// \brief Sets the epoch count for TransferNode
/// \brief Sets the epoch count for DataQueueNode
/// \param[in] node The node being visited
/// \param[in,out] modified Indicator if the node was changed at all
/// \return Status The status code returned
Status VisitAfter(std::shared_ptr<TransferNode> node, bool *const modified) override;
Status VisitAfter(std::shared_ptr<DataQueueNode> node, bool *const modified) override;
/// \brief All operators have a flag that might be set related to the repeat and any leaf nodes need to be set up
/// for use with a controlling repeat above it.

View File

@ -21,7 +21,7 @@
#include "minddata/dataset/engine/ir/datasetops/root_node.h"
#include "minddata/dataset/engine/ir/datasetops/skip_node.h"
#include "minddata/dataset/engine/ir/datasetops/transfer_node.h"
#include "minddata/dataset/engine/ir/datasetops/data_queue_node.h"
#include "minddata/dataset/util/status.h"
namespace mindspore {
@ -60,12 +60,12 @@ Status AddSkipPass::InjectionFinder::Visit(std::shared_ptr<BuildSentenceVocabNod
}
#endif
Status AddSkipPass::InjectionFinder::VisitAfter(std::shared_ptr<TransferNode> node, bool *const modified) {
Status AddSkipPass::InjectionFinder::VisitAfter(std::shared_ptr<DataQueueNode> node, bool *const modified) {
RETURN_UNEXPECTED_IF_NULL(node);
RETURN_UNEXPECTED_IF_NULL(modified);
CHECK_FAIL_RETURN_UNEXPECTED(node->Children().size() > 0,
"Invalid data, the number of children should be greater than zero.");
// Assumption: There is only one TransferNode in a pipeline. This assumption is not validated here.
// Assumption: There is only one DataQueueNode in a pipeline. This assumption is not validated here.
// Move the injection point to the child of this node.
injection_point_ = node->Children()[0];
return Status::OK();

View File

@ -60,11 +60,11 @@ class AddSkipPass : public IRTreePass {
Status Visit(std::shared_ptr<BuildSentenceVocabNode> node, bool *const modified) override;
#endif
/// \brief Register the TransferNode for further action.
/// \brief Register the DataQueueNode for further action.
/// \param[in] node The node being visited
/// \param[in, out] modified Indicator if the node was changed at all
/// \return Status The status code returned
Status VisitAfter(std::shared_ptr<TransferNode> node, bool *const modified) override;
Status VisitAfter(std::shared_ptr<DataQueueNode> node, bool *const modified) override;
/// \brief Getter
std::shared_ptr<DatasetNode> injection_point() { return injection_point_; }

View File

@ -18,7 +18,7 @@
#include "minddata/dataset/engine/opt/pre/epoch_ctrl_pass.h"
#include "minddata/dataset/engine/ir/datasetops/epoch_ctrl_node.h"
#include "minddata/dataset/engine/ir/datasetops/root_node.h"
#include "minddata/dataset/engine/ir/datasetops/transfer_node.h"
#include "minddata/dataset/engine/ir/datasetops/data_queue_node.h"
namespace mindspore {
namespace dataset {
@ -57,12 +57,12 @@ Status EpochCtrlPass::InjectionFinder::Visit(std::shared_ptr<BuildSentenceVocabN
}
#endif
Status EpochCtrlPass::InjectionFinder::VisitAfter(std::shared_ptr<TransferNode> node, bool *const modified) {
Status EpochCtrlPass::InjectionFinder::VisitAfter(std::shared_ptr<DataQueueNode> node, bool *const modified) {
RETURN_UNEXPECTED_IF_NULL(node);
RETURN_UNEXPECTED_IF_NULL(modified);
CHECK_FAIL_RETURN_UNEXPECTED(node->Children().size() > 0,
"Invalid data, the node of child should be greater than zero.");
// Assumption: There is only one TransferNode in a pipeline. This assumption is not validated here.
// Assumption: There is only one DataQueueNode in a pipeline. This assumption is not validated here.
// Move the injection point to the child of this node.
injection_point_ = node->Children()[0];
return Status::OK();

View File

@ -62,11 +62,11 @@ class EpochCtrlPass : public IRTreePass {
Status Visit(std::shared_ptr<BuildSentenceVocabNode> node, bool *const modified) override;
#endif
/// \brief Register the TransferNode for further action.
/// \brief Register the DataQueueNode for further action.
/// \param[in] node The node being visited
/// \param[in, out] modified Indicator if the node was changed at all
/// \return Status The status code returned
Status VisitAfter(std::shared_ptr<TransferNode> node, bool *const modified) override;
Status VisitAfter(std::shared_ptr<DataQueueNode> node, bool *const modified) override;
/// \brief Getter
std::shared_ptr<DatasetNode> injection_point() { return injection_point_; }

View File

@ -168,7 +168,7 @@ Status AutoTune::SetAutotuneConfigJson() {
if (autotune_config_json_.empty()) {
nlohmann::json out_json;
RETURN_IF_NOT_OK(Serdes::SaveToJSON(tree_adapter_->RootIRNode(), "", &out_json));
// We do not want to serialize TransferNode/DeviceQueueOp
// We do not want to serialize DataQueueNode/DataQueueOp
if (out_json["op_type"] == kTransferNode) {
CHECK_FAIL_RETURN_UNEXPECTED(
out_json["children"].size() == 1,
@ -186,7 +186,7 @@ Status AutoTune::SummarizeTreeConfiguration(std::vector<std::string> *out) {
constexpr int val_width = 2;
for (int i = static_cast<int>(ops_.size()) - 1; i >= 0; --i) {
const auto op = ops_[i];
if (!op->inlined() && op->Name() != "DeviceQueueOp") {
if (!op->inlined() && op->Name() != "DataQueueOp") {
std::stringstream s;
s << std::left << std::setw(op_name_width) << op->NameWithID() << "(num_parallel_workers:" << std::right
<< std::setw(val_width) << (op->NumWorkers() == 0 ? "NA" : std::to_string(op->NumWorkers()))
@ -213,7 +213,7 @@ void AutoTune::PostMainLogging() const {
void AutoTune::PrintTreeConfiguration() const {
ExecutionTree const *tree = tree_adapter_->tree_.get();
for (auto itr = tree->begin(); itr != tree->end(); (void)itr++) {
if (!itr->inlined() && itr->Name() != "DeviceQueueOp") {
if (!itr->inlined() && itr->Name() != "DataQueueOp") {
MS_LOG(INFO) << itr->NameWithID() << " num_parallel_workers: " << itr->NumWorkers()
<< " prefetch_size: " << itr->ConnectorCapacity();
}
@ -395,7 +395,7 @@ Status AutoTune::RunIterationStep() {
Status AutoTune::RegisterWorkersQueue() {
ExecutionTree *tree = tree_adapter_->tree_.get();
for (auto itr = tree->begin(); itr != tree->end(); (void)itr++) {
if (!itr->inlined() && itr->Name() != "DeviceQueueOp") {
if (!itr->inlined() && itr->Name() != "DataQueueOp") {
(void)phase_1_best_workers.push_back(itr->NumWorkers());
(void)phase_1_best_queue.push_back(itr->ConnectorCapacity());
}
@ -410,7 +410,7 @@ Status AutoTune::ResetWorkersQueue() {
ExecutionTree *tree = tree_adapter_->tree_.get();
int counter = 0;
for (auto itr = tree->begin(); itr != tree->end(); (void)itr++) {
if (!itr->inlined() && itr->Name() != "DeviceQueueOp") {
if (!itr->inlined() && itr->Name() != "DataQueueOp") {
int32_t target_workers = phase_1_best_workers[counter];
int32_t target_queue = phase_1_best_queue[counter];
RETURN_IF_NOT_OK(RequestNumWorkerChange(itr->id(), -1, &target_workers));
@ -689,7 +689,7 @@ Status AutoTune::AnalyseMemory() {
if (phase_3_state_ == AutoTuneMemPhase::kAutoTuneMemInit) {
count_down_ = 0;
for (auto op_id : parallel_ops_ids_) {
if ((SkipOpsCheck(op_id)) || (ops_[op_id]->Name() == "DeviceQueueOp")) {
if ((SkipOpsCheck(op_id)) || (ops_[op_id]->Name() == "DataQueueOp")) {
// Op not supported - ignore throughout AT
(void)OP_values.push_back(-1);
continue;

View File

@ -54,9 +54,9 @@ json ConnectorSize::ParseOpInfo(const DatasetOp &node) const {
json_node["op_type"] = node.Name();
json_node["num_workers"] = node.NumWorkers();
json metrics;
// DeviceQueueOp is a special op,it is not inlined but its output queue is invalid.
// DataQueueOp is a special op,it is not inlined but its output queue is invalid.
// So we should not output its queue size.
if (!node.inlined() && node.Name() != "DeviceQueueOp") {
if (!node.inlined() && node.Name() != "DataQueueOp") {
metrics["output_queue"] = {{"length", node.ConnectorCapacity()}};
}
json_node["metrics"] = metrics;
@ -90,7 +90,7 @@ Status ConnectorSize::SaveToFile(const std::string &dir_path, const std::string
[&](const ConnectorSizeSample &sample) { return sample[idx]; });
auto &ops_data = output["op_info"];
if (ops_data[idx]["metrics"].contains("output_queue") && ops_data[idx]["op_type"] != "DeviceQueueOp") {
if (ops_data[idx]["metrics"].contains("output_queue") && ops_data[idx]["op_type"] != "DataQueueOp") {
ops_data[idx]["metrics"]["output_queue"]["size"] = cur_queue_size;
}
}

View File

@ -377,40 +377,40 @@ class ProfilingManager {
/// \return Status object with the error code
Status GetConnectorSizeByTime(int32_t op_id, uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result);
/// \brief API to get the connector size of DatasetIterator or DeviceQueueOp
/// \brief API to get the connector size of DatasetIterator or DataQueueOp
/// \param [in] epoch_num The epoch number for which results are requested
/// \param [out] result A vector with connector size at each step
/// \return Status object with the error code
Status GetConnectorSizeByEpoch(int32_t epoch_num, std::vector<int32_t> *result);
/// \brief API to get the connector size of DatasetIterator or DeviceQueueOp
/// \brief API to get the connector size of DatasetIterator or DataQueueOp
/// \param [in] start_step The step interval start range
/// \param [in] end_step The step interval end range
/// \param [out] result A vector with connector size at each step
/// \return Status object with the error code
Status GetConnectorSizeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result);
/// \brief API to get the connector size of DatasetIterator or DeviceQueueOp
/// \brief API to get the connector size of DatasetIterator or DataQueueOp
/// \param [in] start_ts The time interval start range in ms
/// \param [in] end_ts The time interval end range in ms
/// \param [out] result A vector with connector size at each step
/// \return Status object with the error code
Status GetConnectorSizeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result);
/// \brief API to get the connector capacity of DatasetIterator or DeviceQueueOp
/// \brief API to get the connector capacity of DatasetIterator or DataQueueOp
/// \param [in] epoch_num The epoch number for which results are requested
/// \param [out] result A vector with connector capacity at each step
/// \return Status object with the error code
Status GetConnectorCapacityByEpoch(int32_t epoch_num, std::vector<int32_t> *result);
/// \brief API to get the connector capacity of DatasetIterator or DeviceQueueOp
/// \brief API to get the connector capacity of DatasetIterator or DataQueueOp
/// \param [in] start_step The step interval start range
/// \param [in] end_step The step interval end range
/// \param [out] result A vector with connector capacity at each step
/// \return Status object with the error code
Status GetConnectorCapacityByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result);
/// \brief API to get the connector capacity of DatasetIterator or DeviceQueueOp
/// \brief API to get the connector capacity of DatasetIterator or DataQueueOp
/// \param [in] start_ts The time interval start range in ms
/// \param [in] end_ts The time interval end range in ms
/// \param [out] result A vector with connector capacity for steps in the given time range
@ -477,20 +477,20 @@ class ProfilingManager {
/// \return Status object with the error code
Status GetBatchTimeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result);
/// \brief API to get fraction of steps that DatasetIterator or DeviceQueueOp connector was empty
/// \brief API to get fraction of steps that DatasetIterator or DataQueueOp connector was empty
/// \param [in] epoch_num The epoch number for which results are requested
/// \param [out] result The empty queue frequency
/// \return Status object with the error code
Status GetEmptyQueueFrequencyByEpoch(int32_t epoch_num, float_t *result);
/// \brief API to get fraction of steps that DatasetIterator or DeviceQueueOp connector was empty
/// \brief API to get fraction of steps that DatasetIterator or DataQueueOp connector was empty
/// \param [in] start_step The step interval start range
/// \param [in] end_step The step interval end range
/// \param [out] result The empty queue frequency
/// \return Status object with the error code
Status GetEmptyQueueFrequencyByStep(int32_t start_step, int32_t end_step, float_t *result);
/// \brief API to get fraction of steps that DatasetIterator or DeviceQueueOp connector was empty
/// \brief API to get fraction of steps that DatasetIterator or DataQueueOp connector was empty
/// \param [in] start_ts The time interval start range in ms
/// \param [in] end_ts The time interval end range in ms
/// \param [out] result The empty queue frequency

View File

@ -222,7 +222,7 @@ Status Serdes::CreateDatasetOperationNode(const std::shared_ptr<DatasetNode> &ds
} else if (op_type == kSkipNode) {
RETURN_IF_NOT_OK(SkipNode::from_json(json_obj, ds, result));
} else if (op_type == kTransferNode) {
RETURN_IF_NOT_OK(TransferNode::from_json(json_obj, ds, result));
RETURN_IF_NOT_OK(DataQueueNode::from_json(json_obj, ds, result));
} else if (op_type == kTakeNode) {
RETURN_IF_NOT_OK(TakeNode::from_json(json_obj, ds, result));
} else {

View File

@ -40,7 +40,7 @@
#include "minddata/dataset/engine/ir/datasetops/repeat_node.h"
#include "minddata/dataset/engine/ir/datasetops/shuffle_node.h"
#include "minddata/dataset/engine/ir/datasetops/skip_node.h"
#include "minddata/dataset/engine/ir/datasetops/transfer_node.h"
#include "minddata/dataset/engine/ir/datasetops/data_queue_node.h"
#include "minddata/dataset/engine/ir/datasetops/take_node.h"
#include "minddata/dataset/engine/ir/datasetops/zip_node.h"

View File

@ -86,7 +86,7 @@ class MS_API Dataset : public std::enable_shared_from_this<Dataset> {
public:
// need friend class so they can access the children_ field
friend class Iterator;
friend class TransferNode;
friend class DataQueueNode;
/// \brief Constructor
Dataset();

View File

@ -66,7 +66,7 @@ class MS_API Dataset : public std::enable_shared_from_this<Dataset> {
public:
// need friend class so they can access the children_ field
friend class Iterator;
friend class TransferNode;
friend class DataQueueNode;
/// \brief Constructor
Dataset();

View File

@ -19,7 +19,7 @@
#include "minddata/dataset/util/log_adapter.h"
#include "minddata/dataset/util/system_pool.h"
#ifdef WITH_BACKEND
#include "mindspore/ccsrc/include/backend/data_queue/data_queue_mgr.h"
#include "mindspore/ccsrc/runtime/hardware/device_context_manager.h"
#endif
namespace mindspore {
namespace dataset {
@ -247,9 +247,15 @@ Status Arena::Init() {
int64_t sz = size_in_MB_ * 1048576L;
#ifdef WITH_BACKEND
if (is_cuda_malloc_) {
ptr_ = device::DataQueueMgr::GetInstance().AllocHostMem("GPU", sz);
auto ms_context = MsContext::GetInstance();
RETURN_UNEXPECTED_IF_NULL(ms_context);
auto device_context = device::DeviceContextManager::GetInstance().GetOrCreateDeviceContext(
{ms_context->get_param<std::string>(MS_CTX_DEVICE_TARGET), ms_context->get_param<uint32_t>(MS_CTX_DEVICE_ID)});
RETURN_UNEXPECTED_IF_NULL(device_context);
RETURN_UNEXPECTED_IF_NULL(device_context->device_res_manager_);
ptr_ = device_context->device_res_manager_->AllocateHostMemory(sz);
} else {
ptr_ = device::DataQueueMgr::GetInstance().AllocHostMem("Others", sz);
ptr_ = std::shared_ptr<void>(::malloc(sz), ::free);
}
#else
ptr_ = std::shared_ptr<void>(::malloc(sz), ::free);

View File

@ -140,7 +140,7 @@ Status Task::Run() {
Status Task::Join(WaitFlag blocking) {
#ifdef WITH_BACKEND
MS_EXCEPTION_IF_NULL(MsContext::GetInstance());
RETURN_UNEXPECTED_IF_NULL(MsContext::GetInstance());
std::string device_target = MsContext::GetInstance()->get_param<std::string>(MS_CTX_DEVICE_TARGET);
#endif
if (running_) {
@ -166,8 +166,8 @@ Status Task::Join(WaitFlag blocking) {
wait_times++;
#ifdef WITH_BACKEND
if (device_target == kAscendDevice) {
// Because hostPush hung in DeviceQueueOp, wait 5 seconds and destroy the tdt
if (wait_times > 5 && my_name_.find("DeviceQueueOp") != std::string::npos) {
// Because hostPush hung in DataQueueOp, wait 5 seconds and destroy the tdt
if (wait_times > 5 && my_name_.find("DataQueueOp") != std::string::npos) {
MS_LOG(WARNING) << "Wait " << wait_times << " seconds, "
<< "the task: " << my_name_ << " will be destroyed by TdtHostDestory.";
auto queue =
@ -179,7 +179,7 @@ Status Task::Join(WaitFlag blocking) {
}
// just wait 30 seconds
// case1: cpu usage 100%, DeviceQueueOp thread may destroy without thrd_ future
// case1: cpu usage 100%, DataQueueOp thread may destroy without thread_future
if (wait_times > kWaitInterruptTaskTime) {
MS_LOG(WARNING) << MyName() << " Thread ID " << ss.str()
<< " is not responding. Maybe it's destroyed, task stop.";

View File

@ -141,18 +141,18 @@ AscendDataQueueDynamic::AscendDataQueueDynamic(const size_t capacity)
DataQueueStatus AscendDataQueueDynamic::Push(std::vector<DataQueueItem> data) {
for (size_t i = 0; i < data.size(); i++) {
auto &item = data[i];
if (item.data_ptr_ == nullptr) {
MS_LOG(ERROR) << "Invalid Input: ptr: " << item.data_ptr_ << ", len: " << item.data_len_;
if (item.data_ptr == nullptr) {
MS_LOG(ERROR) << "Invalid Input: ptr: " << item.data_ptr << ", len: " << item.data_len;
return DataQueueStatus::ERROR_INPUT;
}
void *addr = device_context_->device_res_manager_->AllocateMemory(item.data_len_);
void *addr = device_context_->device_res_manager_->AllocateMemory(item.data_len);
if (addr == nullptr) {
MS_LOG(ERROR) << "Allocate device memory of data queue failed";
}
CheckRtRetWithError(
rtMemcpyAsync(addr, item.data_len_, item.data_ptr_, item.data_len_, RT_MEMCPY_HOST_TO_DEVICE, stream_),
rtMemcpyAsync(addr, item.data_len, item.data_ptr, item.data_len, RT_MEMCPY_HOST_TO_DEVICE, stream_),
"Rt Memcpy Error");
item.device_addr_ = addr;
item.device_addr = addr;
}
CheckRtRetWithError(rtStreamSynchronize(stream_), "Call runtime rtStreamSynchronize failed");
node_info_[tail_].data_ = std::move(data);
@ -163,7 +163,7 @@ DataQueueStatus AscendDataQueueDynamic::Push(std::vector<DataQueueItem> data) {
DataQueueStatus AscendDataQueueDynamic::Front(std::vector<DataQueueItem> *data) const {
for (auto &item : node_info_[head_].data_) {
host_release_(item.data_ptr_, item.worker_id_);
host_release_(item.data_ptr, item.worker_id);
}
*data = node_info_[head_].data_;
return DataQueueStatus::SUCCESS;
@ -300,12 +300,12 @@ bool AscendTdtQueue::AssembleTensor2AclDataset(const std::vector<DataQueueItem>
for (const auto &ts : data) {
aclDataType acl_type;
acltdtDataItem *acl_data = nullptr;
if (!GetAclDataType(ts.data_type_, &acl_type)) {
MS_LOG(ERROR) << "Convert type " << ts.data_type_ << " to acl type failed.";
if (!GetAclDataType(ts.data_type, &acl_type)) {
MS_LOG(ERROR) << "Convert type " << ts.data_type << " to acl type failed.";
return false;
}
const auto &shape = ts.shapes_;
const auto &shape = ts.shapes;
std::string shape_str = "[";
for (auto dim : shape) {
(void)shape_str.append(std::to_string(dim)).append(",");
@ -313,8 +313,8 @@ bool AscendTdtQueue::AssembleTensor2AclDataset(const std::vector<DataQueueItem>
shape_str.pop_back();
(void)shape_str.append("]");
void *data_ptr = ts.data_ptr_;
size_t data_size = ts.data_len_;
void *data_ptr = ts.data_ptr;
size_t data_size = ts.data_len;
acl_data = acltdtCreateDataItem(acltdtTensorType::ACL_TENSOR_DATA_TENSOR, (shape.empty() ? nullptr : &shape[0]),
shape.size(), acl_type, data_ptr, data_size);
@ -499,20 +499,20 @@ bool AscendHostQueue::CreateDataItemInfos(const std::vector<DataQueueItem> &data
for (auto &ts : data) {
aclDataType acl_type;
if (!GetAclDataType(ts.data_type_, &acl_type)) {
MS_LOG(ERROR) << "Convert type " << ts.data_type_ << " to acl type failed.";
if (!GetAclDataType(ts.data_type, &acl_type)) {
MS_LOG(ERROR) << "Convert type " << ts.data_type << " to acl type failed.";
return false;
}
const auto &shape = ts.shapes_;
void *data_ptr = ts.data_ptr_;
size_t data_size = ts.data_len_;
const auto &shape = ts.shapes;
void *data_ptr = ts.data_ptr;
size_t data_size = ts.data_len;
if (ts.data_type_ != "string") {
if (ts.data_type != "string") {
items->emplace_back(BuildDataItemInfo(ACL_TENSOR_DATA_TENSOR, static_cast<int32_t>(acl_type),
(shape.empty() ? nullptr : &shape[0]), shape.size(), data_ptr, data_size));
} else {
MS_LOG(ERROR) << "Create data item failed when send data with type:" << ts.data_type_;
MS_LOG(ERROR) << "Create data item failed when send data with type:" << ts.data_type;
}
}
return true;

View File

@ -1081,7 +1081,7 @@ bool AscendKernelRuntime::RunTask(const session::KernelGraph &graph) {
DumpTaskExceptionInfo(graph);
#endif
#ifdef WITH_BACKEND
// Run task error, we should call TdtHostDestroy to release tdt to avoid DeviceQueueOp hostPush hung
// Run task error, we should call TdtHostDestroy to release tdt to avoid DataQueueOp hostPush hung
// case1: cpu usage 100% cause thread/process exit, but some tdt thread remain in backend
if (!tdt_handle::DestroyHandle()) {
MS_LOG(WARNING) << "Destroy tdt channel failed.";

View File

@ -32,20 +32,6 @@ void SetThreadToDeviceId(uint32_t device_id) {
MS_LOG(EXCEPTION) << "cudaSetDevice failed, ret[" << static_cast<int>(ret) << "], " << cudaGetErrorString(ret);
}
}
std::shared_ptr<void> AllocHostMemory(size_t size) {
void *ptr;
auto ret = cudaHostAlloc(&ptr, size, cudaHostAllocDefault);
if (ret != cudaSuccess) {
MS_LOG(EXCEPTION) << "cudaHostAlloc failed, ret[" << static_cast<int>(ret) << "], " << cudaGetErrorString(ret);
}
return std::shared_ptr<void>(ptr, [](void *ptr) -> void {
if (ptr != nullptr) {
(void)cudaFreeHost(ptr);
}
});
}
} // namespace
GpuDataQueueDynamic::GpuDataQueueDynamic(const size_t capacity) : DataQueue(capacity), node_info_(nullptr) {
node_info_ = std::make_unique<NodeInfo[]>(capacity);
@ -61,14 +47,14 @@ GpuDataQueueDynamic::GpuDataQueueDynamic(const size_t capacity) : DataQueue(capa
DataQueueStatus GpuDataQueueDynamic::Push(std::vector<DataQueueItem> data) {
for (size_t i = 0; i < data.size(); i++) {
auto &item = data[i];
if (item.data_ptr_ == nullptr) {
MS_LOG(ERROR) << "Invalid Input: ptr: " << item.data_ptr_ << ", len: " << item.data_len_;
if (item.data_ptr == nullptr) {
MS_LOG(ERROR) << "Invalid Input: ptr: " << item.data_ptr << ", len: " << item.data_len;
return DataQueueStatus::ERROR_INPUT;
}
void *addr = device_context_->device_res_manager_->AllocateMemory(item.data_len_);
CHECK_CUDA_RET_WITH_ERROR(cudaMemcpyAsync(addr, item.data_ptr_, item.data_len_, cudaMemcpyHostToDevice, stream_),
void *addr = device_context_->device_res_manager_->AllocateMemory(item.data_len);
CHECK_CUDA_RET_WITH_ERROR(cudaMemcpyAsync(addr, item.data_ptr, item.data_len, cudaMemcpyHostToDevice, stream_),
"Cuda Memcpy Error");
item.device_addr_ = addr;
item.device_addr = addr;
}
node_info_[tail_].event_.reset(new cudaEvent_t());
@ -84,7 +70,7 @@ DataQueueStatus GpuDataQueueDynamic::Front(std::vector<DataQueueItem> *data) con
CHECK_CUDA_RET_WITH_ERROR(cudaEventSynchronize(*(node_info_[head_].event_)), "Cuda Event Syn Failed");
CHECK_CUDA_RET_WITH_ERROR(cudaEventDestroy(*(node_info_[head_].event_)), "Cuda Destroy Event Failed");
for (auto &item : node_info_[head_].data_) {
host_release_(item.data_ptr_, item.worker_id_);
host_release_(item.data_ptr, item.worker_id);
}
*data = node_info_[head_].data_;
return DataQueueStatus::SUCCESS;
@ -100,8 +86,6 @@ bool GpuDataQueueDynamic::Destroy() { return true; }
void GpuDataQueueDynamic::SetThreadDevice() { SetThreadToDeviceId(device_id_); }
std::shared_ptr<void> GpuDataQueueDynamic::AllocHostMem(size_t size) { return AllocHostMemory(size); }
GpuQueue::GpuQueue(size_t capacity, void *addr, const std::vector<size_t> &shape)
: DataQueue(capacity), buffer_(addr), shape_(shape), len_(0), stream_(0), node_info_(nullptr) {
CHECK_CUDA_RET_WITH_ERROR(cudaStreamCreate(&stream_), "Cuda Create Stream Failed");
@ -120,24 +104,24 @@ DataQueueStatus GpuQueue::Push(std::vector<DataQueueItem> data) {
void *addr = reinterpret_cast<uint8_t *>(buffer_) + tail_ * len_;
for (size_t i = 0; i < data.size(); i++) {
auto &item = data[i];
MS_EXCEPTION_IF_NULL(item.data_ptr_);
if (item.data_len_ != shape_[i] && !ds_detected_) {
MS_EXCEPTION_IF_NULL(item.data_ptr);
if (item.data_len != shape_[i] && !ds_detected_) {
MS_LOG(WARNING) << "Detected that dataset is dynamic shape, it is suggested to call network.set_inputs() to "
"configure dynamic dims of input data before running the network";
ds_detected_ = true;
}
if (item.data_len_ > shape_[i]) {
MS_LOG(ERROR) << "Data size(" << item.data_len_ << ") of item " << item.data_ptr_ << " exceeds the max capacity("
if (item.data_len > shape_[i]) {
MS_LOG(ERROR) << "Data size(" << item.data_len << ") of item " << item.data_ptr << " exceeds the max capacity("
<< shape_[i] << "), "
<< "you need to call network.set_inputs() to "
"configure dynamic dims of input data before running the network";
return DataQueueStatus::ERROR_INPUT;
}
CHECK_CUDA_RET_WITH_ERROR(cudaMemcpyAsync(addr, item.data_ptr_, item.data_len_, cudaMemcpyHostToDevice, stream_),
CHECK_CUDA_RET_WITH_ERROR(cudaMemcpyAsync(addr, item.data_ptr, item.data_len, cudaMemcpyHostToDevice, stream_),
"Cuda Memcpy Error");
item.device_addr_ = addr;
addr = reinterpret_cast<uint8_t *>(addr) + item.data_len_;
item.device_addr = addr;
addr = reinterpret_cast<uint8_t *>(addr) + item.data_len;
}
node_info_[tail_].event_.reset(new cudaEvent_t());
@ -153,7 +137,7 @@ DataQueueStatus GpuQueue::Front(std::vector<DataQueueItem> *data) const {
CHECK_CUDA_RET_WITH_ERROR(cudaEventSynchronize(*(node_info_[head_].event_)), "Cuda Event Syn Failed");
CHECK_CUDA_RET_WITH_ERROR(cudaEventDestroy(*(node_info_[head_].event_)), "Cuda Destroy Event Failed");
for (auto &item : node_info_[head_].data_) {
host_release_(item.data_ptr_, item.worker_id_);
host_release_(item.data_ptr, item.worker_id);
}
*data = node_info_[head_].data_;
return DataQueueStatus::SUCCESS;
@ -180,8 +164,6 @@ bool GpuQueue::Destroy() {
void GpuQueue::SetThreadDevice() { SetThreadToDeviceId(device_id_); }
std::shared_ptr<void> GpuQueue::AllocHostMem(size_t size) { return AllocHostMemory(size); }
namespace {
std::shared_ptr<DataQueue> CreateGpuDataQueue(const std::string &, bool dynamic_shape, size_t capacity, void *addr,
const std::vector<size_t> &shape) {

View File

@ -39,7 +39,6 @@ class BACKEND_EXPORT GpuDataQueueDynamic : public DataQueue {
bool Destroy() override;
void SetThreadDevice() override;
std::shared_ptr<void> AllocHostMem(size_t size) override;
private:
struct NodeInfo {
@ -65,7 +64,6 @@ class BACKEND_EXPORT GpuQueue : public DataQueue {
bool Destroy() override;
void SetThreadDevice() override;
std::shared_ptr<void> AllocHostMem(size_t size) override;
private:
struct NodeInfo {

View File

@ -759,6 +759,20 @@ DeprecatedInterface *GPUDeviceContext::GetDeprecatedInterface() {
return deprecated_interface_.get();
}
std::shared_ptr<void> GPUDeviceResManager::AllocateHostMemory(size_t size) const {
void *ptr;
if (CudaDriver::AllocHostPinnedMem(size, &ptr) != size) {
MS_LOG(ERROR) << "Failed to allow host pinned memory.";
return nullptr;
}
return std::shared_ptr<void>(ptr, [](void *ptr) -> void {
if (ptr != nullptr) {
CudaDriver::FreeHostPinnedMem(ptr);
}
});
}
MS_REGISTER_DEVICE(kGPUDevice, GPUDeviceContext);
} // namespace gpu
} // namespace device

View File

@ -42,6 +42,8 @@ class GPUDeviceResManager : public DeviceResManager {
bool BindDeviceToCurrentThread() const override;
std::shared_ptr<void> AllocateHostMemory(size_t size) const override;
std::vector<void *> AllocateContinuousMemory(const std::vector<size_t> &size_list) const override;
DeviceAddressPtr CreateDeviceAddress(void *const device_ptr, size_t device_size, const string &format, TypeId type_id,

View File

@ -167,8 +167,8 @@ bool DatasetIteratorKernelMod::Launch(const std::vector<AddressPtr> &, const std
for (size_t i = 0; i < output_data_.size(); i++) {
void *output_addr = GetDeviceAddress<void>(outputs, i);
auto device_addr = output_data_[i].device_addr_;
auto data_len = output_data_[i].data_len_;
auto device_addr = output_data_[i].device_addr;
auto data_len = output_data_[i].data_len;
CHECK_CUDA_RET_WITH_EXCEPT(kernel_node_,
cudaMemcpyAsync(output_addr, device_addr, data_len, cudaMemcpyDeviceToDevice,
reinterpret_cast<cudaStream_t>(stream)),
@ -187,7 +187,7 @@ void DatasetIteratorKernelMod::SyncData() {
std::vector<ShapeVector> shapes;
for (const auto &item : output_data_) {
ShapeVector shape;
std::transform(item.shapes_.begin(), item.shapes_.end(), std::back_inserter(shape), LongToSize);
std::transform(item.shapes.begin(), item.shapes.end(), std::back_inserter(shape), LongToSize);
shapes.push_back(shape);
}
common::AnfAlgo::SetOutputInferTypeAndShape(types_, shapes, kernel_node_.lock().get());

View File

@ -120,7 +120,7 @@ DataQueueStatus DataQueueMgr::CreateDynamicBufQueue(const std::string &channel_n
MS_EXCEPTION_IF_NULL(MsContext::GetInstance());
std::string device_name = MsContext::GetInstance()->get_param<std::string>(MS_CTX_DEVICE_TARGET);
std::shared_ptr<DataQueue> device_queue = CreateDataQueue(device_name, channel_name, true, capacity, nullptr, {});
if (device_queue != nullptr && (device_name == kGPUDevice || device_name == kAscendDevice)) {
if (device_queue != nullptr) {
std::shared_ptr<BlockingQueue> queue = std::make_shared<BlockingQueue>();
DataQueueStatus rt = queue->Create(device_queue);
if (rt != DataQueueStatus::SUCCESS) {
@ -261,23 +261,15 @@ std::shared_ptr<DataQueue> DataQueueMgr::GetDataQueue(const std::string &channel
return iter->second->Queue();
}
DataQueueStatus DataQueueMgr::SetThreadDevice(const std::string &device_name) {
auto tmp_queue = CreateDataQueue(device_name, {}, false, 0, nullptr, {});
if (tmp_queue == nullptr) {
DataQueueStatus DataQueueMgr::SetThreadDevice(const std::string &channel_name) {
auto queue = GetDataQueue(channel_name);
if (queue == nullptr) {
return DataQueueStatus::QUEUE_NOT_EXIST;
}
tmp_queue->SetThreadDevice();
queue->SetThreadDevice();
return DataQueueStatus::SUCCESS;
}
std::shared_ptr<void> DataQueueMgr::AllocHostMem(const std::string &device_name, size_t size) {
auto tmp_queue = CreateDataQueue(device_name, {}, false, 0, nullptr, {});
if (tmp_queue == nullptr) {
return std::shared_ptr<void>(::malloc(size), ::free);
}
return tmp_queue->AllocHostMem(size);
}
#ifndef BUILD_LITE
bool PopDataFromDataQueue(const AnfNodePtr &data_kernel) {
auto queue_name = common::AnfAlgo::GetNodeAttr<std::string>(data_kernel, "shared_name");
@ -299,12 +291,12 @@ bool PopDataFromDataQueue(const AnfNodePtr &data_kernel) {
std::vector<TypeId> types;
std::vector<size_t> output_size_list;
for (size_t i = 0; i < data.size(); ++i) {
device_tensors[i]->set_ptr(data[i].device_addr_);
device_tensors[i]->SetSize(data[i].data_len_);
device_tensors[i]->set_ptr(data[i].device_addr);
device_tensors[i]->SetSize(data[i].data_len);
device_tensors[i]->set_from_mem_pool(true);
output_size_list.push_back(data[i].data_len_);
output_size_list.push_back(data[i].data_len);
(void)kernel_info->SetOutputAddr(device_tensors[i], i);
shapes.push_back(data[i].shapes_);
shapes.push_back(data[i].shapes);
types.push_back(common::AnfAlgo::GetOutputInferDataType(data_kernel, i));
}
auto kernel_mod = kernel_info->MutableKernelMod();

View File

@ -111,6 +111,11 @@ class DeviceResManager {
virtual bool AllocateMemory(DeviceAddress *const &address) const;
virtual void FreeMemory(DeviceAddress *const &address) const;
// Allocate host memory with raii and ref count
virtual std::shared_ptr<void> AllocateHostMemory(size_t size) const {
return std::shared_ptr<void>(::malloc(size), ::free);
}
// Allocate continuous device memory according to size list.
// Communication operators may need continuous memory for input and output
// to optimize the communication performance.

View File

@ -159,7 +159,7 @@ if(MSLITE_MINDDATA_IMPLEMENT STREQUAL "full")
${MINDDATA_DIR}/engine/datasetops/dataset_op.cc
${MINDDATA_DIR}/engine/datasetops/repeat_op.cc
${MINDDATA_DIR}/engine/datasetops/epoch_ctrl_op.cc
${MINDDATA_DIR}/engine/datasetops/device_queue_op.cc
${MINDDATA_DIR}/engine/datasetops/data_queue_op.cc
${MINDDATA_DIR}/engine/datasetops/project_op.cc
${MINDDATA_DIR}/engine/datasetops/shuffle_op.cc
${MINDDATA_DIR}/engine/datasetops/skip_op.cc

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -3771,8 +3771,8 @@ class TransferDataset(Dataset):
total_batch = 0
if hasattr(self.children[0], "__total_batch__"):
total_batch = self.children[0].__total_batch__
return cde.TransferNode(children[0], self.queue_name, self.device_type, self.device_id, self._send_epoch_end,
total_batch, self._create_data_info_queue)
return cde.DataQueueNode(children[0], self.queue_name, self.device_type, self.device_id, self._send_epoch_end,
total_batch, self._create_data_info_queue)
def create_dict_iterator(self, num_epochs=-1, output_numpy=False):
raise RuntimeError("TransferDataset is not iterable.")

View File

@ -506,7 +506,7 @@ TEST_F(MindDataTestDeserialize, TestDeserializeFill) {
std::shared_ptr<TensorOperation> operation2 = std::make_shared<text::ToNumberOperation>("int32_t");
std::vector<std::shared_ptr<TensorOperation>> ops = {operation1, operation2};
ds = std::make_shared<MapNode>(ds, ops);
ds = std::make_shared<TransferNode>(ds, "queue", "type", 1, true, 10, true);
ds = std::make_shared<DataQueueNode>(ds, "queue", "type", 1, true, 10, true);
compare_dataset(ds);
}