sync in dataset recovery before graph compile & Optimize reset performance for Random Accessable Dataset

This commit is contained in:
luoyang 2024-03-21 15:30:19 +08:00
parent 5fad95269d
commit 8992c64eb3
13 changed files with 130 additions and 6 deletions

View File

@ -179,6 +179,12 @@ PYBIND_REGISTER(ToDevice, 1, ([](const py::module *m) {
} }
return py::make_tuple(types, shapes); return py::make_tuple(types, shapes);
}) })
.def("GetMbufQueueSize",
[](ToDevice &self) {
size_t queue_size = 0;
THROW_IF_ERROR(self.GetMbufQueueSize(&queue_size));
return queue_size;
})
.def("GetSendInfo", .def("GetSendInfo",
[](ToDevice &self) { [](ToDevice &self) {
std::vector<std::vector<double>> send_info; std::vector<std::vector<double>> send_info;

View File

@ -366,6 +366,17 @@ Status ToDevice::GetDataInfo(std::vector<DataType> *const types, std::vector<Ten
return Status::OK(); return Status::OK();
} }
Status ToDevice::GetMbufQueueSize(size_t *queue_size) {
RETURN_UNEXPECTED_IF_NULL(queue_size);
// tree_.root() must be DataQueueOp
std::shared_ptr<DatasetOp> root = std::shared_ptr<DatasetOp>(tree_adapter_->GetRoot());
CHECK_FAIL_RETURN_UNEXPECTED(root != nullptr, "Root is a nullptr.");
DataQueueOp *op = dynamic_cast<DataQueueOp *>(root.get());
CHECK_FAIL_RETURN_UNEXPECTED(op != nullptr, "GetMbufQueueSize only supported by DataQueueOp");
RETURN_IF_NOT_OK(op->GetMbufQueueSize(queue_size));
return Status::OK();
}
Status ToDevice::GetSendInfo(std::vector<std::vector<double>> *send_info) { Status ToDevice::GetSendInfo(std::vector<std::vector<double>> *send_info) {
RETURN_UNEXPECTED_IF_NULL(send_info); RETURN_UNEXPECTED_IF_NULL(send_info);
// tree_.root() must be DataQueueOp // tree_.root() must be DataQueueOp

View File

@ -251,6 +251,10 @@ class ToDevice : public TreeConsumer {
/// \return Status error code /// \return Status error code
virtual Status GetDataInfo(std::vector<DataType> *types, std::vector<TensorShape> *shapes); virtual Status GetDataInfo(std::vector<DataType> *types, std::vector<TensorShape> *shapes);
/// Get data numbers from TDT
/// \return Status error code
virtual Status GetMbufQueueSize(size_t *queue_size);
/// Get send info in sink mode /// Get send info in sink mode
/// \return Status error code /// \return Status error code
virtual Status GetSendInfo(std::vector<std::vector<double>> *send_info); virtual Status GetSendInfo(std::vector<std::vector<double>> *send_info);

View File

@ -747,6 +747,17 @@ Status DataQueueOp::GetDataInfo(DATA_INFO *data_info) {
return Status::OK(); return Status::OK();
} }
Status DataQueueOp::GetMbufQueueSize(size_t *queue_size) {
#ifdef WITH_BACKEND
if (device_type_ == DeviceType::Ascend) {
*queue_size = ascend_data_queue_->QueryQueueSize();
} else {
*queue_size = 1;
}
#endif
return Status::OK();
}
std::vector<std::vector<double>> DataQueueOp::GetSendInfo() { std::vector<std::vector<double>> DataQueueOp::GetSendInfo() {
std::vector<std::vector<double>> send_info_per_epoch; std::vector<std::vector<double>> send_info_per_epoch;
(void)std::transform(send_summary_.begin(), send_summary_.end(), std::back_inserter(send_info_per_epoch), (void)std::transform(send_summary_.begin(), send_summary_.end(), std::back_inserter(send_info_per_epoch),

View File

@ -83,6 +83,8 @@ class DataQueueOp : public PipelineOp {
Status GetDataInfo(DATA_INFO *data_info); Status GetDataInfo(DATA_INFO *data_info);
Status GetMbufQueueSize(size_t *queue_size);
std::vector<std::vector<double>> GetSendInfo(); std::vector<std::vector<double>> GetSendInfo();
// Name: Print() // Name: Print()

View File

@ -16,8 +16,61 @@
#include "minddata/dataset/engine/datasetops/source/sampler/skip_first_epoch_sampler.h" #include "minddata/dataset/engine/datasetops/source/sampler/skip_first_epoch_sampler.h"
#include <algorithm>
#include <memory>
#include <string>
namespace mindspore { namespace mindspore {
namespace dataset { namespace dataset {
Status SkipFirstEpochSamplerRT::GetNextSample(TensorRow *out) {
RETURN_UNEXPECTED_IF_NULL(out);
if (id_count_ > num_samples_) {
RETURN_STATUS_UNEXPECTED(
"[Internal ERROR] Sampler index must be less than or equal to num_samples(total rows in dataset), but got:" +
std::to_string(id_count_) + ", num_samples_: " + std::to_string(num_samples_));
} else if (id_count_ == num_samples_) {
(*out) = TensorRow(TensorRow::kFlagEOE);
} else {
if (HasChildSampler()) {
RETURN_IF_NOT_OK(child_[0]->GetNextSample(&child_ids_));
}
std::shared_ptr<Tensor> sampleIds;
// Compute how many ids are left to pack, and pack this amount into a new Tensor. Respect the setting for
// samples per Tensor though.
int64_t remaining_ids = num_samples_ - id_count_;
int64_t num_elements = std::min(remaining_ids, samples_per_tensor_);
RETURN_IF_NOT_OK(CreateSamplerTensor(&sampleIds, num_elements));
if (HasChildSampler()) {
std::string err_msg = "Failed to copy full sample ids into child sampler.";
int64_t copy_data_length = num_elements * sizeof(int64_t);
if (copy_data_length < SECUREC_MEM_MAX_LEN) {
int ret_code = memcpy_s(sampleIds->GetMutableBuffer(), copy_data_length,
child_ids_[0]->GetMutableBuffer() + current_id_ * sizeof(int64_t), copy_data_length);
CHECK_FAIL_RETURN_UNEXPECTED(ret_code == EOK, err_msg);
} else {
auto dest = std::memcpy(sampleIds->GetMutableBuffer(),
child_ids_[0]->GetMutableBuffer() + current_id_ * sizeof(int64_t), copy_data_length);
CHECK_FAIL_RETURN_UNEXPECTED(dest == sampleIds->GetMutableBuffer(), err_msg);
}
current_id_ += num_elements;
} else {
auto idPtr = sampleIds->begin<int64_t>();
for (int64_t i = 0; i < num_elements; i++) {
*idPtr = current_id_;
current_id_++; // Move the current id to the next one in the sequence
++idPtr;
}
}
id_count_ += num_elements; // Count the packed ids towards our overall sample count
(*out) = {sampleIds};
}
return Status::OK();
}
Status SkipFirstEpochSamplerRT::ResetSampler(const bool failover_reset) { Status SkipFirstEpochSamplerRT::ResetSampler(const bool failover_reset) {
// This is a special sampler for Failover Reset, its internal state should // This is a special sampler for Failover Reset, its internal state should
// not reset when failover_reset is set to true. // not reset when failover_reset is set to true.

View File

@ -30,6 +30,8 @@ class SkipFirstEpochSamplerRT : public SequentialSamplerRT {
// Destructor. // Destructor.
~SkipFirstEpochSamplerRT() = default; ~SkipFirstEpochSamplerRT() = default;
Status GetNextSample(TensorRow *out) override;
/// \brief Reset for next epoch. /// \brief Reset for next epoch.
/// \param[in] failover_reset A boolean to show whether we are resetting the pipeline /// \param[in] failover_reset A boolean to show whether we are resetting the pipeline
/// \return Status The status code returned /// \return Status The status code returned

View File

@ -61,6 +61,7 @@ Status TreeAdapter::PrePass(const std::shared_ptr<DatasetNode> &ir) {
MS_LOG(INFO) << "Running pre pass loops."; MS_LOG(INFO) << "Running pre pass loops.";
(void)actions.emplace_back(std::make_unique<InputValidationPass>()); (void)actions.emplace_back(std::make_unique<InputValidationPass>());
(void)actions.emplace_back(std::make_unique<CacheValidationPass>()); (void)actions.emplace_back(std::make_unique<CacheValidationPass>());
(void)actions.emplace_back(std::make_unique<NodeRemovalPass>());
(void)actions.emplace_back(std::make_unique<InsertMapPass>()); (void)actions.emplace_back(std::make_unique<InsertMapPass>());
if (usage_ == kDeReset) { if (usage_ == kDeReset) {
(void)actions.emplace_back(std::make_unique<AddSkipPass>()); (void)actions.emplace_back(std::make_unique<AddSkipPass>());
@ -68,7 +69,6 @@ Status TreeAdapter::PrePass(const std::shared_ptr<DatasetNode> &ir) {
(void)actions.emplace_back(std::make_unique<SkipPushdownPass>()); (void)actions.emplace_back(std::make_unique<SkipPushdownPass>());
} }
} }
(void)actions.emplace_back(std::make_unique<NodeRemovalPass>());
(void)actions.emplace_back(std::make_unique<EpochCtrlPass>()); (void)actions.emplace_back(std::make_unique<EpochCtrlPass>());
if (usage_ == kDeGetter) { if (usage_ == kDeGetter) {
(void)actions.emplace_back(std::make_unique<GetterPass>()); (void)actions.emplace_back(std::make_unique<GetterPass>());

View File

@ -4223,6 +4223,12 @@ class _ToDevice:
""" """
return self._to_device.GetDataInfo() return self._to_device.GetDataInfo()
def get_mbuf_queue_size(self):
"""
Get element numbers inside mbuf.
"""
return self._to_device.GetMbufQueueSize()
def get_send_info(self): def get_send_info(self):
""" """
In sink mode, it returns the send information of dataset at this moment. In sink mode, it returns the send information of dataset at this moment.
@ -4337,6 +4343,14 @@ class TransferDataset(Dataset):
return self._to_device.get_data_info() return self._to_device.get_data_info()
raise RuntimeError("Calling get_data_info with bad state.") raise RuntimeError("Calling get_data_info with bad state.")
def get_mbuf_queue_size(self):
"""
Get element numbers inside mbuf.
"""
if self._to_device is not None:
return self._to_device.get_mbuf_queue_size()
raise RuntimeError("Device queue is not init, call get_mbuf_queue_size failed.")
def get_send_info(self): def get_send_info(self):
""" """
In sink mode, it returns the send information of dataset at this moment. In sink mode, it returns the send information of dataset at this moment.

View File

@ -449,6 +449,11 @@ class DatasetHelper:
# Generally, it works in dynamic shape scenarios. # Generally, it works in dynamic shape scenarios.
return self.iter.get_data_info() return self.iter.get_data_info()
# pylint: disable=missing-docstring
def get_mbuf_queue_size(self):
# In sink mode, it returns the element numbers inside mbuf channel.
return self.iter.get_mbuf_queue_size()
# pylint: disable=missing-docstring # pylint: disable=missing-docstring
def get_send_info(self, run_context): def get_send_info(self, run_context):
# In sink mode, it returns the send information of dataset at this moment. # In sink mode, it returns the send information of dataset at this moment.
@ -531,6 +536,7 @@ class _DatasetIter:
self.release = dataset.__transfer_dataset__.release self.release = dataset.__transfer_dataset__.release
self.continue_send = dataset.__transfer_dataset__.continue_send self.continue_send = dataset.__transfer_dataset__.continue_send
self.get_data_info = dataset.__transfer_dataset__.get_data_info self.get_data_info = dataset.__transfer_dataset__.get_data_info
self.get_mbuf_queue_size = dataset.__transfer_dataset__.get_mbuf_queue_size
self.get_send_info = dataset.__transfer_dataset__.get_send_info self.get_send_info = dataset.__transfer_dataset__.get_send_info
if hasattr(dataset.__transfer_dataset__, "_reset"): if hasattr(dataset.__transfer_dataset__, "_reset"):
self._reset = dataset.__transfer_dataset__._reset # pylint: disable=protected-access self._reset = dataset.__transfer_dataset__._reset # pylint: disable=protected-access

View File

@ -22,6 +22,7 @@ import os
import math import math
import copy import copy
import importlib import importlib
import time
import numpy as np import numpy as np
import mindspore import mindspore
@ -523,6 +524,16 @@ class Model:
dataset_sink_mode=True, dataset_sink_mode=True,
sink_size=sink_size) sink_size=sink_size)
self._warmup_dataset(epoch, train_dataset, sink_size) self._warmup_dataset(epoch, train_dataset, sink_size)
if train_dataset.get_init_step() > 0:
mbuf_size = train_dataset.__transfer_dataset__.get_mbuf_queue_size()
while mbuf_size == 0:
time.sleep(10)
mbuf_size = train_dataset.__transfer_dataset__.get_mbuf_queue_size()
if mbuf_size != 0:
break
logger.warning(f"Failover mode, waiting for dataset recover to specify step, "
f"current device queue size: {mbuf_size}")
if context.get_auto_parallel_context("pipeline_stages") > 1 and valid_dataset: if context.get_auto_parallel_context("pipeline_stages") > 1 and valid_dataset:
train_network.add_flags_recursive(is_first_iteration=True) train_network.add_flags_recursive(is_first_iteration=True)
for inputs in train_dataset_helper: for inputs in train_dataset_helper:

View File

@ -74,6 +74,9 @@ class MindData:
def get_data_info(self): def get_data_info(self):
pass pass
def get_mbuf_queue_size(self):
pass
def get_send_info(self): def get_send_info(self):
pass pass

View File

@ -33,7 +33,8 @@ class MindDataSkipPushdownTestOptimizationPass : public UT::DatasetOpTesting {
/// \param[in] root_target Target dataset for compare /// \param[in] root_target Target dataset for compare
/// \param[in] step Skip step /// \param[in] step Skip step
/// \return Status of the function /// \return Status of the function
Status prepare_trees(std::shared_ptr<Dataset> root_original, std::shared_ptr<Dataset> root_target, int64_t step = 0) { Status prepare_trees(std::shared_ptr<Dataset> root_original, std::shared_ptr<Dataset> root_target, int64_t step = 0,
bool check_pipeline = true) {
auto ir_tree = std::make_shared<TreeAdapter>(TreeAdapter::UsageFlag::kDeReset); auto ir_tree = std::make_shared<TreeAdapter>(TreeAdapter::UsageFlag::kDeReset);
// Get the dataset size for calculating the initial epoch // Get the dataset size for calculating the initial epoch
@ -47,7 +48,7 @@ class MindDataSkipPushdownTestOptimizationPass : public UT::DatasetOpTesting {
RETURN_IF_NOT_OK(ir_tree_target->Compile(root_target->IRNode(), 1, RETURN_IF_NOT_OK(ir_tree_target->Compile(root_target->IRNode(), 1,
0)); // Step is 0 for target node tree 0)); // Step is 0 for target node tree
if (step != 0) { if (step != 0 && check_pipeline) {
RETURN_IF_NOT_OK(compare_pass(ir_tree_target->RootIRNode(), ir_tree->RootIRNode())); RETURN_IF_NOT_OK(compare_pass(ir_tree_target->RootIRNode(), ir_tree->RootIRNode()));
} }
RETURN_IF_NOT_OK(compare_pass_row(ir_tree_target, ir_tree)); RETURN_IF_NOT_OK(compare_pass_row(ir_tree_target, ir_tree));
@ -367,19 +368,19 @@ TEST_F(MindDataSkipPushdownTestOptimizationPass, SkipPushdownSkip0) {
root = ImageFolder(folder_path, false, std::make_shared<SequentialSampler>())->Project({"label", "image"})->Skip(0); root = ImageFolder(folder_path, false, std::make_shared<SequentialSampler>())->Project({"label", "image"})->Skip(0);
root_target = ImageFolder(folder_path, false, std::make_shared<SequentialSampler>())->Project({"label", "image"}); root_target = ImageFolder(folder_path, false, std::make_shared<SequentialSampler>())->Project({"label", "image"});
EXPECT_OK(prepare_trees(root, root_target, 0)); EXPECT_OK(prepare_trees(root, root_target, 0, false));
root = ImageFolder(folder_path, false, std::make_shared<SequentialSampler>()) root = ImageFolder(folder_path, false, std::make_shared<SequentialSampler>())
->Skip(0) ->Skip(0)
->Project({"label", "image"}) ->Project({"label", "image"})
->Skip(0); ->Skip(0);
root_target = ImageFolder(folder_path, false, std::make_shared<SequentialSampler>())->Project({"label", "image"}); root_target = ImageFolder(folder_path, false, std::make_shared<SequentialSampler>())->Project({"label", "image"});
EXPECT_OK(prepare_trees(root, root_target, 0)); EXPECT_OK(prepare_trees(root, root_target, 0, false));
root = ImageFolder(folder_path, false, std::make_shared<SequentialSampler>())->Skip(0)->Project({"label", "image"}); root = ImageFolder(folder_path, false, std::make_shared<SequentialSampler>())->Skip(0)->Project({"label", "image"});
root_target = root_target =
ImageFolder(folder_path, false, std::make_shared<SequentialSampler>())->Skip(1)->Project({"label", "image"}); ImageFolder(folder_path, false, std::make_shared<SequentialSampler>())->Skip(1)->Project({"label", "image"});
EXPECT_OK(prepare_trees(root, root_target, 1)); EXPECT_OK(prepare_trees(root, root_target, 1, false));
root = ImageFolder(folder_path, false, std::make_shared<SequentialSampler>()) root = ImageFolder(folder_path, false, std::make_shared<SequentialSampler>())
->Skip(2) ->Skip(2)