!13290 Add a base class for Mappable source ops

From: @hfarahat
Reviewed-by: 
Signed-off-by:
This commit is contained in:
mindspore-ci-bot 2021-03-31 03:09:21 +08:00 committed by Gitee
commit 1edbbe56ba
24 changed files with 352 additions and 1335 deletions

View File

@ -14,6 +14,7 @@ set(DATASET_ENGINE_DATASETOPS_SOURCE_SRC_FILES
clue_op.cc
csv_op.cc
album_op.cc
mappable_leaf_op.cc
)
set(DATASET_ENGINE_DATASETOPS_SOURCE_SRC_FILES

View File

@ -72,17 +72,15 @@ Status AlbumOp::Builder::SanityCheck() {
AlbumOp::AlbumOp(int32_t num_wkrs, int32_t rows_per_buffer, std::string file_dir, int32_t queue_size, bool do_decode,
const std::set<std::string> &exts, std::unique_ptr<DataSchema> data_schema,
std::shared_ptr<SamplerRT> sampler)
: ParallelOp(num_wkrs, queue_size, std::move(sampler)),
rows_per_buffer_(rows_per_buffer),
: MappableLeafOp(num_wkrs, queue_size, std::move(sampler), rows_per_buffer),
folder_path_(file_dir),
decode_(do_decode),
extensions_(exts),
data_schema_(std::move(data_schema)),
row_cnt_(0),
buf_cnt_(0),
sampler_ind_(0),
dirname_offset_(0),
sample_ids_(nullptr) {
sample_ids_(nullptr),
curr_row_(0) {
// Set the column name map (base class field)
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
@ -131,97 +129,6 @@ Status AlbumOp::PrescanEntry() {
return Status::OK();
}
// Main logic, Register Queue with TaskGroup, launch all threads and do the functor's work
Status AlbumOp::operator()() {
RETURN_IF_NOT_OK(this->PrescanEntry());
RETURN_IF_NOT_OK(LaunchThreadsAndInitOp());
std::unique_ptr<DataBuffer> sampler_buffer;
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
while (true) { // each iterator is 1 epoch
std::vector<int64_t> keys;
keys.reserve(rows_per_buffer_);
while (sampler_buffer->eoe() == false) {
TensorRow sample_row;
RETURN_IF_NOT_OK(sampler_buffer->PopRow(&sample_row));
TensorPtr sample_ids = sample_row[0];
for (auto itr = sample_ids->begin<int64_t>(); itr != sample_ids->end<int64_t>(); ++itr) {
if ((*itr) >= num_rows_) continue; // index out of bound, skipping
keys.push_back(*itr);
row_cnt_++;
if (row_cnt_ % rows_per_buffer_ == 0) {
RETURN_IF_NOT_OK(
io_block_queues_[buf_cnt_++ % num_workers_]->Add(std::make_unique<IOBlock>(keys, IOBlock::kDeIoBlockNone)));
keys.clear();
}
}
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
}
if (keys.empty() == false) {
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(keys, IOBlock::kDeIoBlockNone)));
}
if (IsLastIteration()) {
std::unique_ptr<IOBlock> eoe_block = std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe);
std::unique_ptr<IOBlock> eof_block = std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEof);
RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eoe_block)));
RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eof_block)));
for (int32_t i = 0; i < num_workers_; ++i) {
RETURN_IF_NOT_OK(
io_block_queues_[i]->Add(std::make_unique<IOBlock>(std::vector<int64_t>(), IOBlock::kDeIoBlockNone)));
}
return Status::OK();
} else { // not the last repeat.
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
}
if (epoch_sync_flag_) {
// If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for
// the current epoch.
RETURN_IF_NOT_OK(WaitForWorkers());
}
// If not the last repeat, self-reset and go to loop again.
if (!IsLastIteration()) {
RETURN_IF_NOT_OK(Reset());
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
}
UpdateRepeatAndEpochCounter();
}
}
// contains the main logic of pulling a IOBlock from IOBlockQueue, load a buffer and push the buffer to out_connector_
// IMPORTANT: 1 IOBlock produces 1 DataBuffer
Status AlbumOp::WorkerEntry(int32_t worker_id) {
TaskManager::FindMe()->Post();
int64_t buffer_id = worker_id;
std::unique_ptr<IOBlock> io_block;
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
while (io_block != nullptr) {
if (io_block->wait() == true) {
// Sync io_block is a signal that master thread wants us to pause and sync with other workers.
// The last guy who comes to this sync point should reset the counter and wake up the master thread.
if (++num_workers_paused_ == num_workers_) {
wait_for_workers_post_.Set();
}
} else if (io_block->eoe() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE)));
buffer_id = worker_id;
} else if (io_block->eof() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF)));
} else {
std::vector<int64_t> keys;
RETURN_IF_NOT_OK(io_block->GetKeys(&keys));
if (keys.empty() == true) return Status::OK(); // empty key is a quit signal for workers
std::unique_ptr<DataBuffer> db = std::make_unique<DataBuffer>(buffer_id, DataBuffer::kDeBFlagNone);
RETURN_IF_NOT_OK(LoadBuffer(keys, &db));
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db)));
buffer_id += num_workers_;
}
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
}
RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker.");
}
// Only support JPEG/PNG/GIF/BMP
// Optimization: Could take in a tensor
// This function does not return status because we want to just skip bad input, not crash
@ -443,7 +350,8 @@ Status AlbumOp::LoadIntTensor(const nlohmann::json &json_obj, uint32_t col_num,
// to take a reference to a column descriptor?
// the design of this class is to make the code more readable, forgoing minor performance gain like
// getting rid of duplicated checks
Status AlbumOp::LoadTensorRow(row_id_type row_id, const std::string &file, TensorRow *row) {
Status AlbumOp::LoadTensorRow(row_id_type row_id, TensorRow *row) {
std::string file = image_rows_[row_id];
// testing here is to just print out file path
(*row) = TensorRow(row_id, {});
MS_LOG(INFO) << "Image row file: " << file << ".";
@ -531,19 +439,6 @@ Status AlbumOp::loadColumnData(const std::string &file, int32_t index, nlohmann:
}
}
// Looping over LoadTensorRow to make 1 DataBuffer. 1 function call produces 1 buffer
Status AlbumOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db) {
std::unique_ptr<TensorQTable> deq = std::make_unique<TensorQTable>();
TensorRow trow;
for (const int64_t &key : keys) {
RETURN_IF_NOT_OK(this->LoadTensorRow(key, image_rows_[key], &trow));
deq->push_back(std::move(trow));
}
(*db)->set_tensor_table(std::move(deq));
return Status::OK();
}
void AlbumOp::Print(std::ostream &out, bool show_all) const {
// Always show the id and name as first line regardless if this summary or detailed print
out << "(" << std::setw(2) << operator_id_ << ") <AlbumOp>:";
@ -561,24 +456,12 @@ void AlbumOp::Print(std::ostream &out, bool show_all) const {
}
}
// Reset Sampler and wakeup Master thread (functor)
Status AlbumOp::Reset() {
MS_LOG(DEBUG) << Name() << " performing a self-reset.";
RETURN_IF_NOT_OK(sampler_->ResetSampler());
row_cnt_ = 0;
return Status::OK();
}
// hand shake with Sampler, allow Sampler to call RandomAccessOp's functions to get NumRows
Status AlbumOp::InitSampler() {
RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this));
return Status::OK();
}
Status AlbumOp::LaunchThreadsAndInitOp() {
if (tree_ == nullptr) {
return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "Pipeline init failed, Execution tree not set.");
}
RETURN_IF_NOT_OK(this->PrescanEntry());
// registers QueueList and individual Queues for interrupt services
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
@ -612,13 +495,13 @@ Status AlbumOp::GetNextRow(TensorRow *row) {
RETURN_IF_NOT_OK(sample_buffer->PopRow(&sample_row));
sample_ids_ = sample_row[0];
}
if (row_cnt_ + 1 > sample_ids_->Size()) {
if (curr_row_ + 1 > sample_ids_->Size()) {
return Status::OK();
}
int64_t key;
sample_ids_->GetItemAt(&key, {row_cnt_});
RETURN_IF_NOT_OK(LoadTensorRow(key, image_rows_[key], row));
row_cnt_++;
RETURN_IF_NOT_OK(sample_ids_->GetItemAt(&key, {curr_row_}));
RETURN_IF_NOT_OK(LoadTensorRow(key, row));
curr_row_++;
return Status::OK();
}
} // namespace dataset

View File

@ -30,6 +30,7 @@
#include "minddata/dataset/engine/data_buffer.h"
#include "minddata/dataset/engine/data_schema.h"
#include "minddata/dataset/engine/datasetops/parallel_op.h"
#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h"
#include "minddata/dataset/engine/datasetops/source/sampler/sampler.h"
#include "minddata/dataset/util/path.h"
#include "minddata/dataset/util/queue.h"
@ -47,7 +48,7 @@ class Queue;
using FolderImages = std::shared_ptr<std::pair<std::string, std::queue<std::string>>>;
/// \class AlbumOp album_op.h
class AlbumOp : public ParallelOp, public RandomAccessOp {
class AlbumOp : public MappableLeafOp {
public:
class Builder {
public:
@ -171,17 +172,6 @@ class AlbumOp : public ParallelOp, public RandomAccessOp {
/// \return Status The status code returned
Status PrescanEntry();
/// \brief Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector
/// \param[in] int32_t workerId - id of each worker
/// \return Status The status code returned
Status WorkerEntry(int32_t worker_id) override;
/// \brief Main Loop of AlbumOp
/// Master thread: Fill IOBlockQueue, then goes to sleep
/// Worker thread: pulls IOBlock from IOBlockQueue, work on it then put buffer to mOutConnector
/// \return Status The status code returned
Status operator()() override;
/// \brief A print method typically used for debugging
/// \param[in] out
/// \param[in] show_all
@ -197,10 +187,6 @@ class AlbumOp : public ParallelOp, public RandomAccessOp {
std::string Name() const override { return "AlbumOp"; }
private:
/// \brief Initialize Sampler, calls sampler->Init() within
/// \return Status The status code returned
Status InitSampler();
/// \brief Load image to tensor row
/// \param[in] image_file Image name of file
/// \param[in] col_num Column num in schema
@ -265,10 +251,9 @@ class AlbumOp : public ParallelOp, public RandomAccessOp {
/// \brief Load a tensor row according to a json file
/// \param[in] row_id_type row_id - id for this tensor row
/// \param[in] ImageColumns file Json file location
/// \param[in, out] TensorRow row Json content stored into a tensor row
/// \return Status The status code returned
Status LoadTensorRow(row_id_type row_id, const std::string &file, TensorRow *row);
Status LoadTensorRow(row_id_type row_id, TensorRow *row) override;
/// \brief Load a tensor column according to a json file
/// \param[in] ImageColumns file Json file location
@ -278,23 +263,14 @@ class AlbumOp : public ParallelOp, public RandomAccessOp {
/// \return Status The status code returned
Status loadColumnData(const std::string &file, int32_t index, nlohmann::json js, TensorRow *row);
/// \param[in] const std::vector<int64_t> &keys Keys in ioblock
/// \param[in, out] std::unique_ptr<DataBuffer> db Databuffer to push to
/// \return Status The status code returned
Status LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db);
/// \brief Called first when function is called
/// \return Status The status code returned
Status LaunchThreadsAndInitOp();
/// \brief reset Op
/// \return Status The status code returned
Status Reset() override;
Status LaunchThreadsAndInitOp() override;
Status GetNextRow(TensorRow *row) override;
// Private function for computing the assignment of the column name map.
// @return Status The status code returned
/// Private function for computing the assignment of the column name map.
/// \return Status The status code returned
Status ComputeColMap() override;
int32_t rows_per_buffer_;
@ -303,12 +279,12 @@ class AlbumOp : public ParallelOp, public RandomAccessOp {
std::set<std::string> extensions_; // extensions allowed
std::unordered_map<std::string, int32_t> col_name_map_;
std::unique_ptr<DataSchema> data_schema_;
int64_t row_cnt_;
int64_t buf_cnt_;
int64_t sampler_ind_;
int64_t dirname_offset_;
std::vector<std::string> image_rows_;
TensorPtr sample_ids_;
int32_t curr_row_;
};
} // namespace dataset
} // namespace mindspore

View File

@ -79,8 +79,7 @@ Status CelebAOp::Builder::SanityCheck() {
CelebAOp::CelebAOp(int32_t num_workers, int32_t rows_per_buffer, const std::string &dir, int32_t queue_size,
bool decode, const std::string &usage, const std::set<std::string> &exts,
std::unique_ptr<DataSchema> schema, std::shared_ptr<SamplerRT> sampler)
: ParallelOp(num_workers, queue_size, std::move(sampler)),
rows_per_buffer_(rows_per_buffer),
: MappableLeafOp(num_workers, queue_size, std::move(sampler), rows_per_buffer),
folder_path_(dir),
decode_(decode),
extensions_(exts),
@ -269,121 +268,8 @@ std::vector<std::string> CelebAOp::Split(const std::string &line) {
return split;
}
// Main logic, Register Queue with TaskGroup, launch all threads and do the functor's work
Status CelebAOp::operator()() {
RETURN_IF_NOT_OK(LaunchThreadsAndInitOp());
std::unique_ptr<DataBuffer> data_buffer;
RETURN_IF_NOT_OK(sampler_->GetNextSample(&data_buffer));
RETURN_IF_NOT_OK(AddIOBlock(&data_buffer));
return Status::OK();
}
Status CelebAOp::AddIOBlock(std::unique_ptr<DataBuffer> *data_buffer) {
int64_t buff_count = 0;
while (true) {
std::vector<int64_t> keys;
keys.reserve(rows_per_buffer_);
int64_t row_count = 0;
while (!(*data_buffer)->eoe()) {
TensorRow sample_row;
RETURN_IF_NOT_OK((*data_buffer)->PopRow(&sample_row));
std::shared_ptr<Tensor> sample_ids = sample_row[0];
for (auto itr = sample_ids->begin<int64_t>(); itr != sample_ids->end<int64_t>(); ++itr) {
if ((*itr) >= num_rows_) {
MS_LOG(WARNING) << "Sample Id (" << *itr << ") is out of bounds, skipping. Max id is " << num_rows_ << ".";
continue;
}
keys.push_back(*itr);
row_count++;
if (row_count % rows_per_buffer_ == 0) {
RETURN_IF_NOT_OK(io_block_queues_[buff_count++ % num_workers_]->Add(
std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone))));
keys.clear();
}
}
RETURN_IF_NOT_OK(sampler_->GetNextSample(data_buffer));
}
if (!keys.empty()) {
RETURN_IF_NOT_OK(io_block_queues_[(buff_count++) % num_workers_]->Add(
std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone))));
}
if (IsLastIteration()) {
RETURN_IF_NOT_OK(
io_block_queues_[(buff_count++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
RETURN_IF_NOT_OK(
io_block_queues_[(buff_count++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEof)));
for (int32_t i = 0; i < num_workers_; i++) {
RETURN_IF_NOT_OK(
io_block_queues_[i]->Add(std::make_unique<IOBlock>(std::vector<int64_t>(), IOBlock::kDeIoBlockNone)));
}
return Status::OK();
} else { // not the last repeat.
RETURN_IF_NOT_OK(
io_block_queues_[(buff_count++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
}
if (epoch_sync_flag_) {
// If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for
// the current epoch.
RETURN_IF_NOT_OK(WaitForWorkers());
}
// If not the last repeat, self-reset and go to loop again.
if (!IsLastIteration()) {
RETURN_IF_NOT_OK(Reset());
RETURN_IF_NOT_OK(sampler_->GetNextSample(data_buffer));
}
UpdateRepeatAndEpochCounter();
}
}
Status CelebAOp::WorkerEntry(int32_t worker_id) {
TaskManager::FindMe()->Post();
int64_t buffer_id = worker_id;
std::unique_ptr<IOBlock> io_block;
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
while (io_block != nullptr) {
if (io_block->wait() == true) {
// Sync io_block is a signal that master thread wants us to pause and sync with other workers.
// The last guy who comes to this sync point should reset the counter and wake up the master thread.
if (++num_workers_paused_ == num_workers_) {
wait_for_workers_post_.Set();
}
} else if (io_block->eoe() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE)));
buffer_id = worker_id;
} else if (io_block->eof() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF)));
} else {
std::vector<int64_t> keys;
RETURN_IF_NOT_OK(io_block->GetKeys(&keys));
if (keys.empty()) {
return Status::OK(); // empty key is a quit signal for workers
}
std::unique_ptr<DataBuffer> db = std::make_unique<DataBuffer>(buffer_id, DataBuffer::kDeBFlagNone);
RETURN_IF_NOT_OK(LoadBuffer(keys, &db));
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db)));
buffer_id += num_workers_;
}
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
}
return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "Unexpected nullptr received in worker.");
}
Status CelebAOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db) {
std::unique_ptr<TensorQTable> deq = std::make_unique<TensorQTable>();
for (const auto &key : keys) {
TensorRow row;
RETURN_IF_NOT_OK(LoadTensorRow(key, image_labels_vec_[key], &row));
deq->push_back(std::move(row));
}
(*db)->set_tensor_table(std::move(deq));
return Status::OK();
}
Status CelebAOp::LoadTensorRow(row_id_type row_id, const std::pair<std::string, std::vector<int32_t>> &image_label,
TensorRow *row) {
Status CelebAOp::LoadTensorRow(row_id_type row_id, TensorRow *row) {
std::pair<std::string, std::vector<int32_t>> &image_label = image_labels_vec_[row_id];
std::shared_ptr<Tensor> image;
std::shared_ptr<Tensor> label;
@ -432,13 +318,6 @@ void CelebAOp::Print(std::ostream &out, bool show_all) const {
}
}
// Reset Sampler and wakeup Master thread (functor)
Status CelebAOp::Reset() {
MS_LOG(DEBUG) << Name() << " performing a self-reset.";
RETURN_IF_NOT_OK(sampler_->ResetSampler());
return Status::OK();
}
Status CelebAOp::ComputeColMap() {
// Set the column name map (base class field)
if (column_name_id_map_.empty()) {

View File

@ -27,6 +27,7 @@
#include "minddata/dataset/util/status.h"
#include "minddata/dataset/engine/data_schema.h"
#include "minddata/dataset/engine/datasetops/parallel_op.h"
#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h"
#include "minddata/dataset/engine/datasetops/source/sampler/sampler.h"
#include "minddata/dataset/util/queue.h"
#include "minddata/dataset/engine/datasetops/source/io_block.h"
@ -41,7 +42,7 @@
namespace mindspore {
namespace dataset {
class CelebAOp : public ParallelOp, RandomAccessOp {
class CelebAOp : public MappableLeafOp {
public:
class Builder {
public:
@ -148,27 +149,11 @@ class CelebAOp : public ParallelOp, RandomAccessOp {
~CelebAOp() override = default;
// Main Loop of CelebAOp
// Master thread: Fill IOBlockQueue, then goes to sleep
// Worker thread: pulls IOBlock from IOBlockQueue, work on it then put buffer to mOutConnector
// @return Status The status code returned
Status operator()() override;
// Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector
// @param int32_t worker_id - id of each worker
// @return Status The status code returned
Status WorkerEntry(int32_t worker_id) override;
// A print method typically used for debugging
// @param out
// @param show_all
void Print(std::ostream &out, bool show_all) const override;
// Method in operator(), to fill IOBlockQueue
// @param std::unique_ptr<DataBuffer> sampler_buffer - to fill IOBlockQueue
// @return Status The status code returned
Status AddIOBlock(std::unique_ptr<DataBuffer> *data_buffer);
// Op name getter
// @return Name of the current Op
std::string Name() const override { return "CelebAOp"; }
@ -176,7 +161,7 @@ class CelebAOp : public ParallelOp, RandomAccessOp {
private:
// Called first when function is called
// @return
Status LaunchThreadsAndInitOp();
Status LaunchThreadsAndInitOp() override;
// Parse attribute file
// @return
@ -191,32 +176,21 @@ class CelebAOp : public ParallelOp, RandomAccessOp {
// @return std::vector<std::string> - string after split
std::vector<std::string> Split(const std::string &line);
// @param const std::vector<int64_t> &keys - keys in ioblock
// @param std::unique_ptr<DataBuffer> db
// @return Status The status code returned
Status LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db);
// Load a tensor row according to a pair
// @param row_id_type row_id - id for this tensor row
// @param std::pair - <image_file,<label>>
// @param TensorRow row - image & label read into this tensor row
// @return Status The status code returned
Status LoadTensorRow(row_id_type row_id, const std::pair<std::string, std::vector<int32_t>> &image_label,
TensorRow *row);
Status LoadTensorRow(row_id_type row_id, TensorRow *row) override;
// Check if need read according to dataset type
// @return bool - if need read
bool CheckDatasetTypeValid();
// reset Op
// @return Status The status code returned
Status Reset() override;
// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;
int32_t rows_per_buffer_;
std::string folder_path_; // directory of celeba folder
bool decode_;
std::set<std::string> extensions_; // extensions allowed

View File

@ -88,76 +88,16 @@ Status CifarOp::Builder::SanityCheck() {
CifarOp::CifarOp(CifarType type, const std::string &usage, int32_t num_works, int32_t rows_per_buf,
const std::string &file_dir, int32_t queue_size, std::unique_ptr<DataSchema> data_schema,
std::shared_ptr<SamplerRT> sampler)
: ParallelOp(num_works, queue_size, std::move(sampler)),
: MappableLeafOp(num_works, queue_size, std::move(sampler), rows_per_buf),
cifar_type_(type),
usage_(usage),
rows_per_buffer_(rows_per_buf),
folder_path_(file_dir),
data_schema_(std::move(data_schema)),
row_cnt_(0),
buf_cnt_(0) {
data_schema_(std::move(data_schema)) {
constexpr uint64_t kUtilQueueSize = 512;
cifar_raw_data_block_ = std::make_unique<Queue<std::vector<unsigned char>>>(kUtilQueueSize);
io_block_queues_.Init(num_workers_, queue_size);
}
// Main logic, Register Queue with TaskGroup, launch all threads and do the functor's work
Status CifarOp::operator()() {
RETURN_IF_NOT_OK(LaunchThreadsAndInitOp());
std::unique_ptr<DataBuffer> sampler_buffer;
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
while (true) { // each iterator is 1 epoch
std::vector<int64_t> keys;
keys.reserve(rows_per_buffer_);
while (sampler_buffer->eoe() == false) {
TensorRow sample_row;
RETURN_IF_NOT_OK(sampler_buffer->PopRow(&sample_row));
std::shared_ptr<Tensor> sample_ids = sample_row[0];
for (auto itr = sample_ids->begin<int64_t>(); itr != sample_ids->end<int64_t>(); itr++) {
keys.push_back(*itr);
row_cnt_++;
if ((*itr) >= num_rows_) continue; // index out of bound, skipping
if (row_cnt_ % rows_per_buffer_ == 0) {
RETURN_IF_NOT_OK(io_block_queues_[buf_cnt_++ % num_workers_]->Add(
std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone))));
keys.clear();
}
}
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
}
if (keys.empty() == false) {
RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(
std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone))));
}
if (IsLastIteration()) {
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEof)));
for (int32_t i = 0; i < num_workers_; i++) {
RETURN_IF_NOT_OK(
io_block_queues_[i]->Add(std::make_unique<IOBlock>(std::vector<int64_t>(), IOBlock::kDeIoBlockNone)));
}
return Status::OK();
} else { // not the last repeat.
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
}
if (epoch_sync_flag_) {
// If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for
// the current epoch.
RETURN_IF_NOT_OK(WaitForWorkers());
}
// If not the last repeat, self-reset and go to loop again.
if (!IsLastIteration()) {
RETURN_IF_NOT_OK(Reset());
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
}
UpdateRepeatAndEpochCounter();
}
}
Status CifarOp::LaunchThreadsAndInitOp() {
if (tree_ == nullptr) {
RETURN_STATUS_UNEXPECTED("Pipeline init failed, Execution tree not set.");
@ -175,43 +115,8 @@ Status CifarOp::LaunchThreadsAndInitOp() {
return Status::OK();
}
// contains the main logic of pulling a IOBlock from IOBlockQueue, load a buffer and push the buffer to out_connector_
// IMPORTANT: 1 IOBlock produces 1 DataBuffer
Status CifarOp::WorkerEntry(int32_t worker_id) {
TaskManager::FindMe()->Post();
int64_t buffer_id = worker_id;
std::unique_ptr<IOBlock> io_block;
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
while (io_block != nullptr) {
if (io_block->wait() == true) {
// Sync io_block is a signal that master thread wants us to pause and sync with other workers.
// The last guy who comes to this sync point should reset the counter and wake up the master thread.
if (++num_workers_paused_ == num_workers_) {
wait_for_workers_post_.Set();
}
} else if (io_block->eoe() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE)));
buffer_id = worker_id;
} else if (io_block->eof() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF)));
} else {
std::vector<int64_t> keys;
RETURN_IF_NOT_OK(io_block->GetKeys(&keys));
if (keys.empty() == true) {
return Status::OK(); // empty key is a quit signal for workers
}
std::unique_ptr<DataBuffer> db = std::make_unique<DataBuffer>(buffer_id, DataBuffer::kDeBFlagNone);
RETURN_IF_NOT_OK(LoadBuffer(keys, &db));
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db)));
buffer_id += num_workers_;
}
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
}
RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker.");
}
// Load 1 TensorRow (image,label). 1 function call produces 1 TensorTow in a DataBuffer
Status CifarOp::LoadTensorRow(uint64_t index, TensorRow *trow) {
Status CifarOp::LoadTensorRow(row_id_type index, TensorRow *trow) {
std::shared_ptr<Tensor> label;
std::shared_ptr<Tensor> fine_label;
std::shared_ptr<Tensor> ori_image = cifar_image_label_pairs_[index].first;
@ -234,18 +139,6 @@ Status CifarOp::LoadTensorRow(uint64_t index, TensorRow *trow) {
return Status::OK();
}
// Looping over LoadTensorRow to make 1 DataBuffer. 1 function call produces 1 buffer
Status CifarOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db) {
std::unique_ptr<TensorQTable> deq = std::make_unique<TensorQTable>();
for (const int64_t &key : keys) {
TensorRow trow;
RETURN_IF_NOT_OK(LoadTensorRow(key, &trow));
deq->push_back(std::move(trow));
}
(*db)->set_tensor_table(std::move(deq));
return Status::OK();
}
void CifarOp::Print(std::ostream &out, bool show_all) const {
if (!show_all) {
// Call the super class for displaying any common 1-liner info
@ -260,20 +153,6 @@ void CifarOp::Print(std::ostream &out, bool show_all) const {
}
}
// Reset Sampler and wakeup Master thread (functor)
Status CifarOp::Reset() {
MS_LOG(DEBUG) << Name() << " performing a self-reset.";
RETURN_IF_NOT_OK(sampler_->ResetSampler());
row_cnt_ = 0;
return Status::OK();
}
// hand shake with Sampler, allow Sampler to call RandomAccessOp's functions to get NumRows
Status CifarOp::InitSampler() {
RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this));
return Status::OK();
}
Status CifarOp::ReadCifarBlockDataAsync() {
TaskManager::FindMe()->Post();
RETURN_IF_NOT_OK(GetCifarFiles());

View File

@ -26,6 +26,7 @@
#include "minddata/dataset/engine/data_buffer.h"
#include "minddata/dataset/engine/data_schema.h"
#include "minddata/dataset/engine/datasetops/parallel_op.h"
#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h"
#include "minddata/dataset/engine/datasetops/source/sampler/sampler.h"
#include "minddata/dataset/util/path.h"
#include "minddata/dataset/util/queue.h"
@ -35,7 +36,7 @@
namespace mindspore {
namespace dataset {
class CifarOp : public ParallelOp, public RandomAccessOp {
class CifarOp : public MappableLeafOp {
public:
enum CifarType { kCifar10, kCifar100 };
@ -142,17 +143,6 @@ class CifarOp : public ParallelOp, public RandomAccessOp {
// Destructor.
~CifarOp() = default;
// Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector
// @param uint32_t workerId - id of each worker
// @return Status The status code returned
Status WorkerEntry(int32_t worker_id) override;
// Main Loop of CifarOp
// Master thread: Fill IOBlockQueue, then goes to sleep
// Worker thread: pulls IOBlock from IOBlockQueue, work on it then put buffer to mOutConnector
// @return Status The status code returned
Status operator()() override;
// A print method typically used for debugging
// @param out
// @param show_all
@ -170,32 +160,20 @@ class CifarOp : public ParallelOp, public RandomAccessOp {
std::string Name() const override { return "CifarOp"; }
private:
// Initialize Sampler, calls sampler->Init() within
// @return Status The status code returned
Status InitSampler();
// Load a tensor row according to a pair
// @param uint64_t index - index need to load
// @param TensorRow row - image & label read into this tensor row
// @return Status The status code returned
Status LoadTensorRow(uint64_t index, TensorRow *row);
// @param const std::vector<uint64_t> &keys - keys in ioblock
// @param std::unique_ptr<DataBuffer> db
// @return Status The status code returned
Status LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db);
Status LoadTensorRow(row_id_type index, TensorRow *trow) override;
private:
// Read block data from cifar file
// @return
Status ReadCifarBlockDataAsync();
// Called first when function is called
// @return
Status LaunchThreadsAndInitOp();
// reset Op
// @return Status The status code returned
Status Reset() override;
Status LaunchThreadsAndInitOp() override;
// Get cifar files in dir
// @return
@ -223,12 +201,9 @@ class CifarOp : public ParallelOp, public RandomAccessOp {
Status ComputeColMap() override;
CifarType cifar_type_;
int32_t rows_per_buffer_;
std::string folder_path_;
std::unique_ptr<DataSchema> data_schema_;
int64_t row_cnt_;
int64_t buf_cnt_;
const std::string usage_; // can only be either "train" or "test"
std::unique_ptr<Queue<std::vector<unsigned char>>> cifar_raw_data_block_;
std::vector<std::string> cifar_files_;

View File

@ -124,82 +124,15 @@ Status CocoOp::Builder::SanityCheck() {
CocoOp::CocoOp(const TaskType &task_type, const std::string &image_folder_path, const std::string &annotation_path,
int32_t num_workers, int32_t rows_per_buffer, int32_t queue_size, bool decode,
std::unique_ptr<DataSchema> data_schema, std::shared_ptr<SamplerRT> sampler)
: ParallelOp(num_workers, queue_size, std::move(sampler)),
: MappableLeafOp(num_workers, queue_size, std::move(sampler), rows_per_buffer),
decode_(decode),
row_cnt_(0),
buf_cnt_(0),
task_type_(task_type),
image_folder_path_(image_folder_path),
annotation_path_(annotation_path),
rows_per_buffer_(rows_per_buffer),
data_schema_(std::move(data_schema)) {
io_block_queues_.Init(num_workers_, queue_size);
}
Status CocoOp::TraverseSampleIds(const std::shared_ptr<Tensor> &sample_ids, std::vector<int64_t> *keys) {
for (auto itr = sample_ids->begin<int64_t>(); itr != sample_ids->end<int64_t>(); ++itr) {
if ((*itr) > num_rows_) continue;
keys->push_back(*itr);
row_cnt_++;
if (row_cnt_ % rows_per_buffer_ == 0) {
RETURN_IF_NOT_OK(io_block_queues_[buf_cnt_++ % num_workers_]->Add(
std::make_unique<IOBlock>(IOBlock(*keys, IOBlock::kDeIoBlockNone))));
keys->clear();
}
}
return Status::OK();
}
Status CocoOp::operator()() {
RETURN_IF_NOT_OK(LaunchThreadsAndInitOp());
std::unique_ptr<DataBuffer> sampler_buffer;
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
while (true) {
std::vector<int64_t> keys;
keys.reserve(rows_per_buffer_);
while (sampler_buffer->eoe() == false) {
std::shared_ptr<Tensor> sample_ids;
RETURN_IF_NOT_OK(sampler_buffer->GetTensor(&sample_ids, 0, 0));
if (sample_ids->type() != DataType(DataType::DE_INT64)) {
RETURN_STATUS_UNEXPECTED("Invalid parameter, data type of Sampler Tensor isn't int64, got " +
sample_ids->type().ToString());
}
RETURN_IF_NOT_OK(TraverseSampleIds(sample_ids, &keys));
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
}
if (keys.empty() == false) {
RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(
std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone))));
}
if (IsLastIteration()) {
std::unique_ptr<IOBlock> eoe_block = std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe);
std::unique_ptr<IOBlock> eof_block = std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEof);
RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eoe_block)));
RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eof_block)));
for (int32_t i = 0; i < num_workers_; i++) {
RETURN_IF_NOT_OK(
io_block_queues_[i]->Add(std::make_unique<IOBlock>(std::vector<int64_t>(), IOBlock::kDeIoBlockNone)));
}
return Status::OK();
} else {
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
}
if (epoch_sync_flag_) {
// If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for
// the current epoch.
RETURN_IF_NOT_OK(WaitForWorkers());
}
// If not the last repeat, self-reset and go to loop again.
if (!IsLastIteration()) {
RETURN_IF_NOT_OK(Reset());
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
}
UpdateRepeatAndEpochCounter();
}
}
void CocoOp::Print(std::ostream &out, bool show_all) const {
if (!show_all) {
// Call the super class for displaying any common 1-liner info
@ -215,14 +148,8 @@ void CocoOp::Print(std::ostream &out, bool show_all) const {
}
}
Status CocoOp::Reset() {
MS_LOG(DEBUG) << Name() << " performing a self-reset.";
RETURN_IF_NOT_OK(sampler_->ResetSampler());
row_cnt_ = 0;
return Status::OK();
}
Status CocoOp::LoadTensorRow(row_id_type row_id, const std::string &image_id, TensorRow *trow) {
Status CocoOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) {
std::string image_id = image_ids_[row_id];
std::shared_ptr<Tensor> image, coordinate;
auto itr = coordinate_map_.find(image_id);
if (itr == coordinate_map_.end()) {
@ -374,48 +301,6 @@ Status CocoOp::LoadMixTensorRow(row_id_type row_id, const std::string &image_id,
return Status::OK();
}
Status CocoOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db) {
std::unique_ptr<TensorQTable> deq = std::make_unique<TensorQTable>();
TensorRow trow;
for (const int64_t &key : keys) {
RETURN_IF_NOT_OK(this->LoadTensorRow(key, image_ids_[key], &trow));
deq->push_back(std::move(trow));
}
(*db)->set_tensor_table(std::move(deq));
return Status::OK();
}
Status CocoOp::WorkerEntry(int32_t worker_id) {
TaskManager::FindMe()->Post();
int64_t buffer_id = worker_id;
std::unique_ptr<IOBlock> io_block;
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
while (io_block != nullptr) {
if (io_block->wait() == true) {
// Sync io_block is a signal that master thread wants us to pause and sync with other workers.
// The last guy who comes to this sync point should reset the counter and wake up the master thread.
if (++num_workers_paused_ == num_workers_) {
wait_for_workers_post_.Set();
}
} else if (io_block->eoe() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE)));
buffer_id = worker_id;
} else if (io_block->eof() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, (std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF))));
} else {
std::vector<int64_t> keys;
RETURN_IF_NOT_OK(io_block->GetKeys(&keys));
if (keys.empty() == true) return Status::OK();
std::unique_ptr<DataBuffer> db = std::make_unique<DataBuffer>(buffer_id, DataBuffer::kDeBFlagNone);
RETURN_IF_NOT_OK(LoadBuffer(keys, &db));
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db)));
buffer_id += num_workers_;
}
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
}
RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker");
}
template <typename T>
Status CocoOp::SearchNodeInJson(const nlohmann::json &input_tree, std::string node_name, T *output_node) {
auto node = input_tree.find(node_name);
@ -627,11 +512,6 @@ Status CocoOp::CategoriesColumnLoad(const nlohmann::json &categories_tree) {
return Status::OK();
}
Status CocoOp::InitSampler() {
RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this));
return Status::OK();
}
Status CocoOp::LaunchThreadsAndInitOp() {
if (tree_ == nullptr) {
RETURN_STATUS_UNEXPECTED("Pipeline init failed, Execution tree not set.");

View File

@ -27,6 +27,7 @@
#include "minddata/dataset/engine/data_buffer.h"
#include "minddata/dataset/engine/data_schema.h"
#include "minddata/dataset/engine/datasetops/parallel_op.h"
#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h"
#include "minddata/dataset/engine/datasetops/source/sampler/sampler.h"
#ifndef ENABLE_ANDROID
#include "minddata/dataset/kernels/image/image_utils.h"
@ -46,7 +47,7 @@ class Queue;
using CoordinateRow = std::vector<std::vector<float>>;
class CocoOp : public ParallelOp, public RandomAccessOp {
class CocoOp : public MappableLeafOp {
public:
enum class TaskType { Detection = 0, Stuff = 1, Panoptic = 2, Keypoint = 3 };
@ -171,17 +172,6 @@ class CocoOp : public ParallelOp, public RandomAccessOp {
// Destructor
~CocoOp() = default;
// Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector
// @param int32_t workerId - id of each worker
// @return Status The status code returned
Status WorkerEntry(int32_t worker_id) override;
// Main Loop of CocoOp
// Master thread: Fill IOBlockQueue, then goes to sleep
// Worker thread: pulls IOBlock from IOBlockQueue, work on it the put buffer to mOutConnector
// @return Status The status code returned
Status operator()() override;
// A print method typically used for debugging
// @param out
// @param show_all
@ -212,16 +202,12 @@ class CocoOp : public ParallelOp, public RandomAccessOp {
Status GetClassIndexing(std::vector<std::pair<std::string, std::vector<int32_t>>> *output_class_indexing) override;
private:
// Initialize Sampler, calls sampler->Init() within
// @return Status The status code returned
Status InitSampler();
// Load a tensor row according to image id
// @param row_id_type row_id - id for this tensor row
// @param std::string image_id - image id
// @param TensorRow row - image & target read into this tensor row
// @return Status The status code returned
Status LoadTensorRow(row_id_type row_id, const std::string &image_id, TensorRow *row);
Status LoadTensorRow(row_id_type row_id, TensorRow *row) override;
// Load a tensor row with vector which a vector to a tensor
// @param row_id_type row_id - id for this tensor row
@ -259,27 +245,13 @@ class CocoOp : public ParallelOp, public RandomAccessOp {
// @return Status The status code returned
Status ReadImageToTensor(const std::string &path, const ColDescriptor &col, std::shared_ptr<Tensor> *tensor);
// @param const std::vector<uint64_t> &keys - keys in ioblock
// @param std::unique_ptr<DataBuffer> db
// @return Status The status code returned
Status LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db);
// Read annotation from Annotation folder
// @return Status The status code returned
Status ParseAnnotationIds();
// @param const std::shared_ptr<Tensor> &sample_ids - sample ids of tensor
// @param std::vector<int64_t> *keys - image id
// @return Status The status code returned
Status TraverseSampleIds(const std::shared_ptr<Tensor> &sample_ids, std::vector<int64_t> *keys);
// Called first when function is called
// @return Status The status code returned
Status LaunchThreadsAndInitOp();
// Reset dataset state
// @return Status The status code returned
Status Reset() override;
Status LaunchThreadsAndInitOp() override;
// @param nlohmann::json image_tree - image tree of json
// @param std::vector<std::string> *image_vec - image id list of json
@ -323,12 +295,9 @@ class CocoOp : public ParallelOp, public RandomAccessOp {
Status ComputeColMap() override;
bool decode_;
int64_t row_cnt_;
int64_t buf_cnt_;
std::string image_folder_path_;
std::string annotation_path_;
TaskType task_type_;
int32_t rows_per_buffer_;
std::unique_ptr<DataSchema> data_schema_;
std::vector<std::string> image_ids_;

View File

@ -68,16 +68,13 @@ ImageFolderOp::ImageFolderOp(int32_t num_wkrs, int32_t rows_per_buffer, std::str
bool recursive, bool do_decode, const std::set<std::string> &exts,
const std::map<std::string, int32_t> &map, std::unique_ptr<DataSchema> data_schema,
std::shared_ptr<SamplerRT> sampler)
: ParallelOp(num_wkrs, queue_size, std::move(sampler)),
rows_per_buffer_(rows_per_buffer),
: MappableLeafOp(num_wkrs, queue_size, std::move(sampler), rows_per_buffer),
folder_path_(file_dir),
recursive_(recursive),
decode_(do_decode),
extensions_(exts),
class_index_(map),
data_schema_(std::move(data_schema)),
row_cnt_(0),
buf_cnt_(0),
sampler_ind_(0),
dirname_offset_(0) {
folder_name_queue_ = std::make_unique<Queue<std::string>>(num_wkrs * queue_size);
@ -125,98 +122,9 @@ Status ImageFolderOp::PrescanMasterEntry(const std::string &filedir) {
return Status::OK();
}
// Main logic, Register Queue with TaskGroup, launch all threads and do the functor's work
Status ImageFolderOp::operator()() {
RETURN_IF_NOT_OK(LaunchThreadsAndInitOp());
std::unique_ptr<DataBuffer> sampler_buffer;
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
while (true) { // each iterator is 1 epoch
std::vector<int64_t> keys;
keys.reserve(rows_per_buffer_);
while (sampler_buffer->eoe() == false) {
TensorRow sample_row;
RETURN_IF_NOT_OK(sampler_buffer->PopRow(&sample_row));
std::shared_ptr<Tensor> sample_ids = sample_row[0];
for (auto itr = sample_ids->begin<int64_t>(); itr != sample_ids->end<int64_t>(); ++itr) {
if ((*itr) >= num_rows_) continue; // index out of bound, skipping
keys.push_back(*itr);
row_cnt_++;
if (row_cnt_ % rows_per_buffer_ == 0) {
RETURN_IF_NOT_OK(
io_block_queues_[buf_cnt_++ % num_workers_]->Add(std::make_unique<IOBlock>(keys, IOBlock::kDeIoBlockNone)));
keys.clear();
}
}
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
}
if (keys.empty() == false) {
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(keys, IOBlock::kDeIoBlockNone)));
}
if (IsLastIteration()) {
std::unique_ptr<IOBlock> eoe_block = std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe);
std::unique_ptr<IOBlock> eof_block = std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEof);
RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eoe_block)));
RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eof_block)));
for (int32_t i = 0; i < num_workers_; ++i) {
RETURN_IF_NOT_OK(
io_block_queues_[i]->Add(std::make_unique<IOBlock>(std::vector<int64_t>(), IOBlock::kDeIoBlockNone)));
}
return Status::OK();
} else { // not the last repeat.
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
}
if (epoch_sync_flag_) {
// If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for
// the current epoch.
RETURN_IF_NOT_OK(WaitForWorkers());
}
// If not the last repeat, self-reset and go to loop again.
if (!IsLastIteration()) {
RETURN_IF_NOT_OK(Reset());
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
}
UpdateRepeatAndEpochCounter();
}
}
// contains the main logic of pulling a IOBlock from IOBlockQueue, load a buffer and push the buffer to out_connector_
// IMPORTANT: 1 IOBlock produces 1 DataBuffer
Status ImageFolderOp::WorkerEntry(int32_t worker_id) {
TaskManager::FindMe()->Post();
int64_t buffer_id = worker_id;
std::unique_ptr<IOBlock> io_block;
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
while (io_block != nullptr) {
if (io_block->wait() == true) {
// Sync io_block is a signal that master thread wants us to pause and sync with other workers.
// The last guy who comes to this sync point should reset the counter and wake up the master thread.
if (++num_workers_paused_ == num_workers_) {
wait_for_workers_post_.Set();
}
} else if (io_block->eoe() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE)));
buffer_id = worker_id;
} else if (io_block->eof() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF)));
} else {
std::vector<int64_t> keys;
RETURN_IF_NOT_OK(io_block->GetKeys(&keys));
if (keys.empty() == true) return Status::OK(); // empty key is a quit signal for workers
std::unique_ptr<DataBuffer> db = std::make_unique<DataBuffer>(buffer_id, DataBuffer::kDeBFlagNone);
RETURN_IF_NOT_OK(LoadBuffer(keys, &db));
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db)));
buffer_id += num_workers_;
}
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
}
RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker");
}
// Load 1 TensorRow (image,label) using 1 ImageLabelPair. 1 function call produces 1 TensorTow in a DataBuffer
Status ImageFolderOp::LoadTensorRow(row_id_type row_id, ImageLabelPair pairPtr, TensorRow *trow) {
Status ImageFolderOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) {
ImageLabelPair pairPtr = image_label_pairs_[row_id];
std::shared_ptr<Tensor> image, label;
RETURN_IF_NOT_OK(Tensor::CreateScalar(pairPtr->second, &label));
RETURN_IF_NOT_OK(Tensor::CreateFromFile(folder_path_ + (pairPtr->first), &image));
@ -233,18 +141,6 @@ Status ImageFolderOp::LoadTensorRow(row_id_type row_id, ImageLabelPair pairPtr,
return Status::OK();
}
// Looping over LoadTensorRow to make 1 DataBuffer. 1 function call produces 1 buffer
Status ImageFolderOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db) {
std::unique_ptr<TensorQTable> deq = std::make_unique<TensorQTable>();
TensorRow trow;
for (const int64_t &key : keys) {
RETURN_IF_NOT_OK(this->LoadTensorRow(key, image_label_pairs_[key], &trow));
deq->push_back(std::move(trow));
}
(*db)->set_tensor_table(std::move(deq));
return Status::OK();
}
void ImageFolderOp::Print(std::ostream &out, bool show_all) const {
if (!show_all) {
// Call the super class for displaying any common 1-liner info
@ -260,20 +156,6 @@ void ImageFolderOp::Print(std::ostream &out, bool show_all) const {
}
}
// Reset Sampler and wakeup Master thread (functor)
Status ImageFolderOp::Reset() {
MS_LOG(DEBUG) << Name() << " performing a self-reset.";
RETURN_IF_NOT_OK(sampler_->ResetSampler());
row_cnt_ = 0;
return Status::OK();
}
// hand shake with Sampler, allow Sampler to call RandomAccessOp's functions to get NumRows
Status ImageFolderOp::InitSampler() {
RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this));
return Status::OK();
}
// Derived from RandomAccessOp
Status ImageFolderOp::GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const {
if (cls_ids == nullptr || !cls_ids->empty() || image_label_pairs_.empty()) {

View File

@ -29,6 +29,7 @@
#include "minddata/dataset/engine/data_buffer.h"
#include "minddata/dataset/engine/data_schema.h"
#include "minddata/dataset/engine/datasetops/parallel_op.h"
#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h"
#include "minddata/dataset/engine/datasetops/source/sampler/sampler.h"
#ifndef ENABLE_ANDROID
#include "minddata/dataset/kernels/image/image_utils.h"
@ -50,7 +51,7 @@ class Queue;
using ImageLabelPair = std::shared_ptr<std::pair<std::string, int32_t>>;
using FolderImagesPair = std::shared_ptr<std::pair<std::string, std::queue<ImageLabelPair>>>;
class ImageFolderOp : public ParallelOp, public RandomAccessOp {
class ImageFolderOp : public MappableLeafOp {
public:
class Builder {
public:
@ -175,22 +176,11 @@ class ImageFolderOp : public ParallelOp, public RandomAccessOp {
// @return Status The status code returned
Status PrescanMasterEntry(const std::string &dir);
// Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector
// @param int32_t workerId - id of each worker
// @return Status The status code returned
Status WorkerEntry(int32_t worker_id) override;
// Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector
// @param int32_t workerId - id of each worker
// @return Status The status code returned
Status PrescanWorkerEntry(int32_t worker_id);
// Main Loop of ImageFolderOp
// Master thread: Fill IOBlockQueue, then goes to sleep
// Worker thread: pulls IOBlock from IOBlockQueue, work on it then put buffer to mOutConnector
// @return Status The status code returned
Status operator()() override;
// Method derived from RandomAccess Op, enable Sampler to get all ids for each class
// @param (std::map<int64_t, std::vector<int64_t >> * map - key label, val all ids for this class
// @return Status The status code returned
@ -217,21 +207,12 @@ class ImageFolderOp : public ParallelOp, public RandomAccessOp {
Status GetNumClasses(int64_t *num_classes) override;
private:
// Initialize Sampler, calls sampler->Init() within
// @return Status The status code returned
Status InitSampler();
// Load a tensor row according to a pair
// @param row_id_type row_id - id for this tensor row
// @param ImageLabelPair pair - <imagefile,label>
// @param TensorRow row - image & label read into this tensor row
// @return Status The status code returned
Status LoadTensorRow(row_id_type row_id, ImageLabelPair pair, TensorRow *row);
// @param const std::vector<int64_t> &keys - keys in ioblock
// @param std::unique_ptr<DataBuffer> db
// @return Status The status code returned
Status LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db);
Status LoadTensorRow(row_id_type row_id, TensorRow *row) override;
// @param std::string & dir - dir to walk all images
// @param int64_t * cnt - number of non folder files under the current dir
@ -244,25 +225,18 @@ class ImageFolderOp : public ParallelOp, public RandomAccessOp {
// Called first when function is called
// @return
Status LaunchThreadsAndInitOp();
// reset Op
// @return Status The status code returned
Status Reset() override;
Status LaunchThreadsAndInitOp() override;
// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;
int32_t rows_per_buffer_;
std::string folder_path_; // directory of image folder
bool recursive_;
bool decode_;
std::set<std::string> extensions_; // extensions allowed
std::map<std::string, int32_t> class_index_;
std::unique_ptr<DataSchema> data_schema_;
int64_t row_cnt_;
int64_t buf_cnt_;
int64_t sampler_ind_;
int64_t dirname_offset_;
std::vector<ImageLabelPair> image_label_pairs_;

View File

@ -67,82 +67,18 @@ Status ManifestOp::Builder::SanityCheck() {
ManifestOp::ManifestOp(int32_t num_works, int32_t rows_per_buffer, std::string file, int32_t queue_size, bool decode,
const std::map<std::string, int32_t> &class_index, std::unique_ptr<DataSchema> data_schema,
std::shared_ptr<SamplerRT> sampler, std::string usage)
: ParallelOp(num_works, queue_size, std::move(sampler)),
rows_per_buffer_(rows_per_buffer),
: MappableLeafOp(num_works, queue_size, std::move(sampler), rows_per_buffer),
io_block_pushed_(0),
row_cnt_(0),
sampler_ind_(0),
data_schema_(std::move(data_schema)),
file_(file),
class_index_(class_index),
decode_(decode),
usage_(usage),
buf_cnt_(0) {
usage_(usage) {
io_block_queues_.Init(num_workers_, queue_size);
(void)std::transform(usage_.begin(), usage_.end(), usage_.begin(), ::tolower);
}
// Main logic, Register Queue with TaskGroup, launch all threads and do the functor's work
Status ManifestOp::operator()() {
RETURN_IF_NOT_OK(LaunchThreadsAndInitOp());
std::unique_ptr<DataBuffer> sampler_buffer;
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
return AddIoBlock(&sampler_buffer);
}
Status ManifestOp::AddIoBlock(std::unique_ptr<DataBuffer> *sampler_buffer) {
while (true) { // each iterator is 1 epoch
std::vector<int64_t> keys;
keys.reserve(rows_per_buffer_);
while (!(*sampler_buffer)->eoe()) {
TensorRow sample_row;
RETURN_IF_NOT_OK((*sampler_buffer)->PopRow(&sample_row));
std::shared_ptr<Tensor> sample_ids = sample_row[0];
for (auto itr = sample_ids->begin<int64_t>(); itr != sample_ids->end<int64_t>(); ++itr) {
if ((*itr) >= num_rows_) continue; // index out of bound, skipping
keys.push_back(*itr);
row_cnt_++;
if (row_cnt_ % rows_per_buffer_ == 0) {
RETURN_IF_NOT_OK(io_block_queues_[buf_cnt_++ % num_workers_]->Add(
std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone))));
keys.clear();
}
}
RETURN_IF_NOT_OK(sampler_->GetNextSample(sampler_buffer));
}
if (keys.empty() == false) {
RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(
std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone))));
}
if (IsLastIteration()) {
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEof)));
for (int32_t i = 0; i < num_workers_; i++) {
RETURN_IF_NOT_OK(
io_block_queues_[i]->Add(std::make_unique<IOBlock>(std::vector<int64_t>(), IOBlock::kDeIoBlockNone)));
}
return Status::OK();
} else {
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
}
if (epoch_sync_flag_) {
// If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for
// the current epoch.
RETURN_IF_NOT_OK(WaitForWorkers());
}
// If not the last repeat, self-reset and go to loop again.
if (!IsLastIteration()) {
RETURN_IF_NOT_OK(Reset());
RETURN_IF_NOT_OK(sampler_->GetNextSample(sampler_buffer));
}
UpdateRepeatAndEpochCounter();
}
}
Status ManifestOp::LaunchThreadsAndInitOp() {
if (tree_ == nullptr) {
RETURN_STATUS_UNEXPECTED("Pipeline init failed, Execution tree not set.");
@ -159,44 +95,9 @@ Status ManifestOp::LaunchThreadsAndInitOp() {
return Status::OK();
}
// contains the main logic of pulling a IOBlock from IOBlockQueue, load a buffer and push the buffer to out_connector_
// IMPORTANT: 1 IOBlock produces 1 DataBuffer
Status ManifestOp::WorkerEntry(int32_t worker_id) {
TaskManager::FindMe()->Post();
int64_t buffer_id = worker_id;
std::unique_ptr<IOBlock> io_block;
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
while (io_block != nullptr) {
if (io_block->wait() == true) {
// Sync io_block is a signal that master thread wants us to pause and sync with other workers.
// The last guy who comes to this sync point should reset the counter and wake up the master thread.
if (++num_workers_paused_ == num_workers_) {
wait_for_workers_post_.Set();
}
} else if (io_block->eoe() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE)));
buffer_id = worker_id;
} else if (io_block->eof() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF)));
} else {
std::vector<int64_t> keys;
RETURN_IF_NOT_OK(io_block->GetKeys(&keys));
if (keys.empty()) {
return Status::OK(); // empty key is a quit signal for workers
}
std::unique_ptr<DataBuffer> db = std::make_unique<DataBuffer>(buffer_id, DataBuffer::kDeBFlagNone);
RETURN_IF_NOT_OK(LoadBuffer(keys, &db));
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db)));
buffer_id += num_workers_;
}
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
}
RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker.");
}
// Load 1 TensorRow (image,label) using 1 ImageLabelPair. 1 function call produces 1 TensorTow in a DataBuffer
Status ManifestOp::LoadTensorRow(row_id_type row_id, const std::pair<std::string, std::vector<std::string>> &data,
TensorRow *trow) {
Status ManifestOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) {
std::pair<std::string, std::vector<std::string>> data = image_labelname_[static_cast<size_t>(row_id)];
std::shared_ptr<Tensor> image;
std::shared_ptr<Tensor> label;
std::vector<int32_t> label_index(data.second.size());
@ -222,18 +123,6 @@ Status ManifestOp::LoadTensorRow(row_id_type row_id, const std::pair<std::string
return Status::OK();
}
// Looping over LoadTensorRow to make 1 DataBuffer. 1 function call produces 1 buffer
Status ManifestOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db) {
std::unique_ptr<TensorQTable> deq = std::make_unique<TensorQTable>();
for (const auto &key : keys) {
TensorRow trow;
RETURN_IF_NOT_OK(LoadTensorRow(key, image_labelname_[static_cast<size_t>(key)], &trow));
deq->push_back(std::move(trow));
}
(*db)->set_tensor_table(std::move(deq));
return Status::OK();
}
void ManifestOp::Print(std::ostream &out, bool show_all) const {
if (!show_all) {
// Call the super class for displaying any common 1-liner info
@ -249,20 +138,6 @@ void ManifestOp::Print(std::ostream &out, bool show_all) const {
}
}
// Reset Sampler and wakeup Master thread (functor)
Status ManifestOp::Reset() {
MS_LOG(DEBUG) << Name() << " performing a self-reset.";
RETURN_IF_NOT_OK(sampler_->ResetSampler());
row_cnt_ = 0;
return Status::OK();
}
// hand shake with Sampler, allow Sampler to call RandomAccessOp's functions to get NumRows
Status ManifestOp::InitSampler() {
RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this));
return Status::OK();
}
// Derived from RandomAccessOp
Status ManifestOp::GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const {
if (cls_ids == nullptr || !cls_ids->empty() || image_labelname_.empty()) {

View File

@ -26,6 +26,7 @@
#include "minddata/dataset/engine/data_buffer.h"
#include "minddata/dataset/engine/data_schema.h"
#include "minddata/dataset/engine/datasetops/parallel_op.h"
#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h"
#include "minddata/dataset/engine/datasetops/source/sampler/sampler.h"
#include "minddata/dataset/kernels/image/image_utils.h"
#include "minddata/dataset/util/queue.h"
@ -35,7 +36,7 @@
namespace mindspore {
namespace dataset {
class ManifestOp : public ParallelOp, public RandomAccessOp {
class ManifestOp : public MappableLeafOp {
public:
class Builder {
public:
@ -143,17 +144,6 @@ class ManifestOp : public ParallelOp, public RandomAccessOp {
// Destructor.
~ManifestOp() = default;
// Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector
// @param int32_t worker_id - id of each worker
// @return Status The status code returned
Status WorkerEntry(int32_t worker_id) override;
// Main Loop of ManifestOp
// Master thread: Fill IOBlockQueue, then goes to sleep
// Worker thread: pulls IOBlock from IOBlockQueue, work on it then put buffer to mOutConnector
// @return Status The status code returned
Status operator()() override;
// Method derived from RandomAccess Op, enable Sampler to get all ids for each class
// @param (std::map<int64_t, std::vector<int64_t >> * map - key label, val all ids for this class
// @return Status The status code returned
@ -194,27 +184,12 @@ class ManifestOp : public ParallelOp, public RandomAccessOp {
Status GetClassIndexing(std::vector<std::pair<std::string, std::vector<int32_t>>> *output_class_indexing) override;
private:
// Initialize Sampler, calls sampler->Init() within
// @return Status The status code returned
Status InitSampler();
// Method in operator(), to fill IOBlockQueue
// @param std::unique_ptr<DataBuffer> sampler_buffer - to fill IOBlockQueue
// @return Status The status code returned
Status AddIoBlock(std::unique_ptr<DataBuffer> *sampler_buffer);
// Load a tensor row according to a pair
// @param row_id_type row_id - id for this tensor row
// @param std::pair<std::string, std::vector<std::string>> - <imagefile, <label1, label2...>>
// @param TensorRow row - image & label read into this tensor row
// @return Status The status code returned
Status LoadTensorRow(row_id_type row_id, const std::pair<std::string, std::vector<std::string>> &data,
TensorRow *row);
// @param const std::vector<int64_t> &keys - keys in ioblock
// @param std::unique_ptr<DataBuffer> db
// @return Status The status code returned
Status LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db);
Status LoadTensorRow(row_id_type row_id, TensorRow *row) override;
// Parse manifest file to get image path and label and so on.
// @return Status The status code returned
@ -222,11 +197,7 @@ class ManifestOp : public ParallelOp, public RandomAccessOp {
// Called first when function is called
// @return Status The status code returned
Status LaunchThreadsAndInitOp();
// reset Op
// @return Status The status code returned
Status Reset() override;
Status LaunchThreadsAndInitOp() override;
// Check if image ia valid.Only support JPEG/PNG/GIF/BMP
// @return
@ -240,16 +211,13 @@ class ManifestOp : public ParallelOp, public RandomAccessOp {
// @return - Status
Status ComputeColMap() override;
int32_t rows_per_buffer_;
int64_t io_block_pushed_;
int64_t row_cnt_;
int64_t sampler_ind_;
std::unique_ptr<DataSchema> data_schema_;
std::string file_; // file that store the information of images
std::map<std::string, int32_t> class_index_;
bool decode_;
std::string usage_;
int64_t buf_cnt_;
std::map<std::string, int32_t> label_index_;
std::vector<std::pair<std::string, std::vector<std::string>>> image_labelname_;

View File

@ -0,0 +1,152 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h"
#include <fstream>
#include <unordered_set>
#include "utils/ms_utils.h"
#include "minddata/dataset/core/config_manager.h"
#include "minddata/dataset/core/tensor_shape.h"
#include "minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h"
#include "minddata/dataset/engine/db_connector.h"
#include "minddata/dataset/engine/execution_tree.h"
namespace mindspore {
namespace dataset {
MappableLeafOp::MappableLeafOp(int32_t num_wkrs, int32_t queue_size, std::shared_ptr<SamplerRT> sampler,
int32_t rows_per_buffer)
: ParallelOp(num_wkrs, queue_size, std::move(sampler)),
row_cnt_(0),
buf_cnt_(0),
rows_per_buffer_(rows_per_buffer) {}
// Main logic, Register Queue with TaskGroup, launch all threads and do the functor's work
Status MappableLeafOp::operator()() {
RETURN_IF_NOT_OK(LaunchThreadsAndInitOp());
std::unique_ptr<DataBuffer> sampler_buffer;
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
while (true) { // each iterator is 1 epoch
std::vector<int64_t> keys;
keys.reserve(rows_per_buffer_);
while (sampler_buffer->eoe() == false) {
TensorRow sample_row;
RETURN_IF_NOT_OK(sampler_buffer->PopRow(&sample_row));
std::shared_ptr<Tensor> sample_ids = sample_row[0];
for (auto itr = sample_ids->begin<int64_t>(); itr != sample_ids->end<int64_t>(); ++itr) {
if ((*itr) >= num_rows_) continue; // index out of bound, skipping
keys.push_back(*itr);
row_cnt_++;
if (row_cnt_ % rows_per_buffer_ == 0) {
RETURN_IF_NOT_OK(
io_block_queues_[buf_cnt_++ % num_workers_]->Add(std::make_unique<IOBlock>(keys, IOBlock::kDeIoBlockNone)));
keys.clear();
}
}
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
}
if (keys.empty() == false) {
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(keys, IOBlock::kDeIoBlockNone)));
}
if (IsLastIteration()) {
std::unique_ptr<IOBlock> eoe_block = std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe);
std::unique_ptr<IOBlock> eof_block = std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEof);
RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eoe_block)));
RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eof_block)));
for (int32_t i = 0; i < num_workers_; ++i) {
RETURN_IF_NOT_OK(
io_block_queues_[i]->Add(std::make_unique<IOBlock>(std::vector<int64_t>(), IOBlock::kDeIoBlockNone)));
}
return Status::OK();
} else { // not the last repeat.
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
}
if (epoch_sync_flag_) {
// If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for
// the current epoch.
RETURN_IF_NOT_OK(WaitForWorkers());
}
// If not the last repeat, self-reset and go to loop again.
if (!IsLastIteration()) {
RETURN_IF_NOT_OK(Reset());
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
}
UpdateRepeatAndEpochCounter();
}
}
// Reset Sampler and wakeup Master thread (functor)
Status MappableLeafOp::Reset() {
MS_LOG(DEBUG) << Name() << " performing a self-reset.";
RETURN_IF_NOT_OK(sampler_->ResetSampler());
return Status::OK();
}
// hand shake with Sampler, allow Sampler to call RandomAccessOp's functions to get NumRows
Status MappableLeafOp::InitSampler() {
RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this));
return Status::OK();
}
// contains the main logic of pulling a IOBlock from IOBlockQueue, load a buffer and push the buffer to out_connector_
// IMPORTANT: 1 IOBlock produces 1 DataBuffer
Status MappableLeafOp::WorkerEntry(int32_t worker_id) {
TaskManager::FindMe()->Post();
int64_t buffer_id = worker_id;
std::unique_ptr<IOBlock> io_block;
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
while (io_block != nullptr) {
if (io_block->wait() == true) {
// Sync io_block is a signal that master thread wants us to pause and sync with other workers.
// The last guy who comes to this sync point should reset the counter and wake up the master thread.
if (++num_workers_paused_ == num_workers_) {
wait_for_workers_post_.Set();
}
} else if (io_block->eoe() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE)));
buffer_id = worker_id;
} else if (io_block->eof() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF)));
} else {
std::vector<int64_t> keys;
RETURN_IF_NOT_OK(io_block->GetKeys(&keys));
if (keys.empty() == true) return Status::OK(); // empty key is a quit signal for workers
std::unique_ptr<DataBuffer> db = std::make_unique<DataBuffer>(buffer_id, DataBuffer::kDeBFlagNone);
RETURN_IF_NOT_OK(LoadBuffer(keys, &db));
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db)));
buffer_id += num_workers_;
}
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
}
RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker");
}
// Looping over LoadTensorRow to make 1 DataBuffer. 1 function call produces 1 buffer
Status MappableLeafOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db) {
std::unique_ptr<TensorQTable> deq = std::make_unique<TensorQTable>();
TensorRow trow;
for (const int64_t &key : keys) {
RETURN_IF_NOT_OK(this->LoadTensorRow(key, &trow));
deq->push_back(std::move(trow));
}
(*db)->set_tensor_table(std::move(deq));
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -0,0 +1,110 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_MAPPABLE_LEAF_OP_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_MAPPABLE_LEAF_OP_H_
#include <deque>
#include <memory>
#include <queue>
#include <string>
#include <algorithm>
#include <map>
#include <set>
#include <utility>
#include <vector>
#include "minddata/dataset/core/tensor.h"
#include "minddata/dataset/engine/data_buffer.h"
#include "minddata/dataset/engine/data_schema.h"
#include "minddata/dataset/engine/datasetops/parallel_op.h"
#include "minddata/dataset/engine/datasetops/source/sampler/sampler.h"
#ifndef ENABLE_ANDROID
#include "minddata/dataset/kernels/image/image_utils.h"
#else
#include "minddata/dataset/kernels/image/lite_image_utils.h"
#endif
#include "minddata/dataset/util/path.h"
#include "minddata/dataset/util/queue.h"
#include "minddata/dataset/util/services.h"
#include "minddata/dataset/util/status.h"
#include "minddata/dataset/util/wait_post.h"
namespace mindspore {
namespace dataset {
// Forward declares
template <typename T>
class Queue;
using ImageLabelPair = std::shared_ptr<std::pair<std::string, int32_t>>;
using FolderImagesPair = std::shared_ptr<std::pair<std::string, std::queue<ImageLabelPair>>>;
class MappableLeafOp : public ParallelOp, public RandomAccessOp {
public:
// Constructor
// @param int32_t num_wkrs - Num of workers reading images in parallel
// @param int32_t - rows_per_buffer Number of images (rows) in each buffer
// @param std::string - dir directory of ImageNetFolder
// @param int32_t queue_size - connector queue size
// @param std::set<std::string> exts - set of file extensions to read, if empty, read everything under the dir
// @param td::unique_ptr<Sampler> sampler - sampler tells the source what to read
MappableLeafOp(int32_t num_wkrs, int32_t queue_size, std::shared_ptr<SamplerRT> sampler, int32_t rows_per_buffer);
// Destructor.
~MappableLeafOp() = default;
// Main Loop of MappableLeaf
// Master thread: Fill IOBlockQueue, then goes to sleep
// Worker thread: pulls IOBlock from IOBlockQueue, work on it then put buffer to mOutConnector
// @return Status The status code returned
Status operator()() override;
// Op name getter
// @return Name of the current Op
std::string Name() const override { return "MappableLeafPp"; }
protected:
// Initialize Sampler, calls sampler->Init() within
// @return Status The status code returned
Status InitSampler();
// // Called first when function is called
// // @return
virtual Status LaunchThreadsAndInitOp() = 0;
Status WorkerEntry(int32_t workerId) override;
// @param const std::vector<int64_t> &keys - keys in ioblock
// @param std::unique_ptr<DataBuffer> db
// @return Status The status code returned
Status LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db);
// Load a tensor row according to a pair
// @param row_id_type row_id - id for this tensor row
// @param ImageLabelPair pair - <imagefile,label>
// @param TensorRow row - loaded row
// @return Status The status code returned
virtual Status LoadTensorRow(row_id_type row_id, TensorRow *row) = 0;
// reset Op
// @return Status The status code returned
Status Reset() override;
int32_t rows_per_buffer_;
int64_t row_cnt_;
int64_t buf_cnt_;
};
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_MAPPABLE_LEAF_OP_H_

View File

@ -27,6 +27,7 @@
#include "minddata/dataset/core/global_context.h"
#include "minddata/dataset/engine/data_buffer.h"
#include "minddata/dataset/engine/datasetops/dataset_op.h"
#include "minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h"
#include "minddata/dataset/engine/db_connector.h"
#include "minddata/dataset/engine/execution_tree.h"
#include "minddata/dataset/util/log_adapter.h"
@ -115,16 +116,14 @@ MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, int32_t rows_per_buf
const std::vector<std::string> &columns_to_load,
const std::vector<std::shared_ptr<ShardOperator>> &operators, int64_t num_padded,
const mindrecord::json &sample_json, const std::map<std::string, std::string> &sample_bytes)
: ParallelOp(num_mind_record_workers, op_connector_queue_size),
rows_per_buffer_(rows_per_buffer),
: MappableLeafOp(num_mind_record_workers, op_connector_queue_size, std::make_shared<SequentialSamplerRT>(0, 0),
rows_per_buffer),
dataset_file_(dataset_file),
load_dataset_(load_dataset),
columns_to_load_(columns_to_load),
operators_(operators),
num_mind_record_workers_(num_mind_record_workers),
num_rows_(0),
buffers_needed_(0),
buf_cnt_(0),
ended_worker_(0),
num_padded_(num_padded),
sample_json_(sample_json),
@ -379,61 +378,19 @@ Status MindRecordOp::LoadTensorRow(TensorRow *tensor_row, const std::vector<uint
return Status::OK();
}
// Class functor operator () override.
// All dataset ops operate by launching a thread (see ExecutionTree). This class functor will
// provide the master loop that drives the logic for performing the work
// Main logic, Register Queue with TaskGroup, launch all threads and do the functor's work
Status MindRecordOp::operator()() {
RETURN_IF_NOT_OK(LaunchThreadAndInitOp());
num_rows_ = shard_reader_->GetNumRows();
// Compute how many buffers we would need to accomplish rowsPerBuffer
buffers_needed_ = (num_rows_ + rows_per_buffer_ - 1) / rows_per_buffer_;
while (true) { // each iterator is 1 epoch
for (int32_t i = 0; i < buffers_needed_; ++i) {
std::vector<int64_t> keys(1, i);
RETURN_IF_NOT_OK(io_block_queues_[buf_cnt_++ % num_workers_]->Add(
std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone))));
}
if (IsLastIteration()) {
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEof)));
for (int32_t i = 0; i < num_workers_; i++) {
RETURN_IF_NOT_OK(io_block_queues_[i]->Add(
std::move(std::make_unique<IOBlock>(std::vector<int64_t>(), IOBlock::kDeIoBlockNone))));
}
return Status::OK();
} else {
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
}
if (epoch_sync_flag_) {
// If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for
// the current epoch.
RETURN_IF_NOT_OK(WaitForWorkers());
}
// If not the last repeat, self-reset and go to loop again.
if (!IsLastIteration()) RETURN_IF_NOT_OK(Reset());
UpdateRepeatAndEpochCounter();
}
}
// Overrides base class reset method. When an operator does a reset, it cleans up any state
// info from it's previous execution and then initializes itself so that it can be executed
// again.
Status MindRecordOp::Reset() {
MS_LOG(DEBUG) << Name() << " performing a self-reset.";
RETURN_IF_NOT_OK(ParallelOp::Reset()); // Call our super class reset first.
RETURN_IF_NOT_OK(MappableLeafOp::Reset()); // Call our super class reset first.
shard_reader_->ShuffleTask();
return Status::OK();
}
Status MindRecordOp::LaunchThreadAndInitOp() {
Status MindRecordOp::LaunchThreadsAndInitOp() {
if (tree_ == nullptr) {
RETURN_STATUS_UNEXPECTED("Pipeline init failed, Execution tree not set.");
}
@ -446,6 +403,8 @@ Status MindRecordOp::LaunchThreadAndInitOp() {
// Launch main workers that load DataBuffers by reading all images
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&MindRecordOp::WorkerEntry, this, std::placeholders::_1), "", id()));
num_rows_ = shard_reader_->GetNumRows();
RETURN_IF_NOT_OK(this->InitSampler()); // pass numRows to Sampler
TaskManager::FindMe()->Post();
return Status::OK();
}

View File

@ -28,7 +28,7 @@
#include <vector>
#include "minddata/dataset/engine/data_schema.h"
#include "minddata/dataset/engine/datasetops/parallel_op.h"
#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h"
#include "minddata/dataset/util/queue.h"
#include "minddata/dataset/util/status.h"
#include "minddata/mindrecord/include/shard_column.h"
@ -50,7 +50,7 @@ using ShardTuple = std::vector<std::tuple<std::vector<uint8_t>, mindrecord::json
const int32_t LOG_INTERVAL = 19;
class MindRecordOp : public ParallelOp {
class MindRecordOp : public MappableLeafOp {
public:
// The nested builder class inside of the MindRecordOp is used to help manage all of the arguments
// for constructing it. Use the builder by setting each argument with the provided set methods,
@ -167,15 +167,9 @@ class MindRecordOp : public ParallelOp {
// @return Status The status code returned
Status WorkerEntry(int32_t worker_id) override;
// Class functor operator () override.
// All DatasetOps operate by launching a thread (see ExecutionTree). This class functor will
// provide the master loop that drives the logic for performing the work.
// @return Status The status code returned
Status operator()() override;
// Called first when function is called
// @return
Status LaunchThreadAndInitOp();
Status LaunchThreadsAndInitOp() override;
// Overrides base class reset method. When an operator does a reset, it cleans up any state
// info from it's previous execution and then initializes itself so that it can be executed
@ -183,15 +177,9 @@ class MindRecordOp : public ParallelOp {
// @return Status The status code returned
Status Reset() override;
// Getter method
int32_t num_rows() const { return num_rows_; }
static Status CountTotalRows(const std::vector<std::string> dataset_path, bool load_dataset,
const std::shared_ptr<ShardOperator> &op, int64_t *count, int64_t num_padded);
// Getter method
int32_t rows_per_buffer() const { return rows_per_buffer_; }
// Getter method
std::vector<std::string> dataset_file() const { return dataset_file_; }
@ -216,19 +204,19 @@ class MindRecordOp : public ParallelOp {
Status LoadTensorRow(TensorRow *tensor_row, const std::vector<uint8_t> &columns_blob,
const mindrecord::json &columns_json, const mindrecord::TaskType task_type);
Status LoadTensorRow(row_id_type row_id, TensorRow *row) override {
return Status(StatusCode::kMDSyntaxError, "Cannot call this method.");
}
// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;
int32_t rows_per_buffer_; // The number of requested rows per buffer.
std::vector<std::string> dataset_file_; // dataset files
bool load_dataset_; // load dataset from single file or not
std::vector<std::string> columns_to_load_; // Columns to load from dataset
std::vector<std::shared_ptr<ShardOperator>> operators_; // ShardOperators to use
int32_t num_mind_record_workers_; // number of workers to be spawned by ShardReader
int32_t buffers_needed_; // Counter for the buffers that were fetched
int64_t buf_cnt_; // Buffer counter
int32_t num_rows_; // One more than the last row id in the range for this cache
std::atomic<int32_t> ended_worker_;
int64_t num_padded_;

View File

@ -75,117 +75,18 @@ Status MnistOp::Builder::SanityCheck() {
MnistOp::MnistOp(const std::string &usage, int32_t num_workers, int32_t rows_per_buffer, std::string folder_path,
int32_t queue_size, std::unique_ptr<DataSchema> data_schema, std::shared_ptr<SamplerRT> sampler)
: ParallelOp(num_workers, queue_size, std::move(sampler)),
: MappableLeafOp(num_workers, queue_size, std::move(sampler), rows_per_buffer),
usage_(usage),
buf_cnt_(0),
row_cnt_(0),
folder_path_(folder_path),
rows_per_buffer_(rows_per_buffer),
image_path_({}),
label_path_({}),
data_schema_(std::move(data_schema)) {
io_block_queues_.Init(num_workers, queue_size);
}
Status MnistOp::TraversalSampleIds(const std::shared_ptr<Tensor> &sample_ids, std::vector<int64_t> *keys) {
for (auto itr = sample_ids->begin<int64_t>(); itr != sample_ids->end<int64_t>(); ++itr) {
if ((*itr) >= num_rows_) continue; // index out of bound, skipping
keys->push_back(*itr);
row_cnt_++;
if (row_cnt_ % rows_per_buffer_ == 0) {
RETURN_IF_NOT_OK(io_block_queues_[buf_cnt_++ % num_workers_]->Add(
std::make_unique<IOBlock>(IOBlock(*keys, IOBlock::kDeIoBlockNone))));
keys->clear();
}
}
return Status::OK();
}
// functor that contains the main logic of MNIST op
Status MnistOp::operator()() {
RETURN_IF_NOT_OK(LaunchThreadsAndInitOp());
std::unique_ptr<DataBuffer> sampler_buffer;
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
while (true) { // each iterator is 1 epoch
std::vector<int64_t> keys;
keys.reserve(rows_per_buffer_);
while (sampler_buffer->eoe() == false) {
std::shared_ptr<Tensor> sample_ids;
RETURN_IF_NOT_OK(sampler_buffer->GetTensor(&sample_ids, 0, 0));
if (sample_ids->type() != DataType(DataType::DE_INT64)) {
RETURN_STATUS_UNEXPECTED("Invalid parameter, data type of Sampler Tensor isn't int64, got " +
sample_ids->type().ToString());
}
RETURN_IF_NOT_OK(TraversalSampleIds(sample_ids, &keys));
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
}
if (keys.empty() == false) {
RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(
std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone))));
}
if (IsLastIteration()) {
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEof)));
for (int32_t i = 0; i < num_workers_; ++i) {
RETURN_IF_NOT_OK(
io_block_queues_[i]->Add(std::make_unique<IOBlock>(std::vector<int64_t>(), IOBlock::kDeIoBlockNone)));
}
return Status::OK();
} else {
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
}
if (epoch_sync_flag_) {
// If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for
// the current epoch.
RETURN_IF_NOT_OK(WaitForWorkers());
}
// If not the last repeat, self-reset and go to loop again.
if (!IsLastIteration()) {
RETURN_IF_NOT_OK(Reset());
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
}
UpdateRepeatAndEpochCounter();
}
}
// contains the logic of pulling a IOBlock from IOBlockQueue, load a buffer and push the buffer to out_connector_
Status MnistOp::WorkerEntry(int32_t worker_id) {
TaskManager::FindMe()->Post();
int64_t buffer_id = worker_id;
std::unique_ptr<IOBlock> iOBlock;
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&iOBlock));
while (iOBlock != nullptr) {
if (iOBlock->wait() == true) {
// Sync io_block is a signal that master thread wants us to pause and sync with other workers.
// The last guy who comes to this sync point should reset the counter and wake up the master thread.
if (++num_workers_paused_ == num_workers_) {
wait_for_workers_post_.Set();
}
} else if (iOBlock->eoe() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE)));
buffer_id = worker_id;
} else if (iOBlock->eof() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF)));
} else {
std::vector<int64_t> keys;
RETURN_IF_NOT_OK(iOBlock->GetKeys(&keys));
if (keys.empty() == true) return Status::OK(); // empty key is a quit signal for workers
std::unique_ptr<DataBuffer> db = std::make_unique<DataBuffer>(buffer_id, DataBuffer::kDeBFlagNone);
RETURN_IF_NOT_OK(LoadBuffer(keys, &db));
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db)));
buffer_id += num_workers_;
}
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&iOBlock));
}
RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker.");
}
// Load 1 TensorRow (image,label) using 1 MnistLabelPair.
Status MnistOp::LoadTensorRow(row_id_type row_id, const MnistLabelPair &mnist_pair, TensorRow *trow) {
Status MnistOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) {
MnistLabelPair mnist_pair = image_label_pairs_[row_id];
std::shared_ptr<Tensor> image, label;
// make a copy of cached tensor
RETURN_IF_NOT_OK(Tensor::CreateFromTensor(mnist_pair.first, &image));
@ -196,18 +97,6 @@ Status MnistOp::LoadTensorRow(row_id_type row_id, const MnistLabelPair &mnist_pa
return Status::OK();
}
// Looping over LoadTensorRow to make 1 DataBuffer. 1 function call produces 1 buffer
Status MnistOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db) {
std::unique_ptr<TensorQTable> deq = std::make_unique<TensorQTable>();
TensorRow trow;
for (const int64_t &key : keys) {
RETURN_IF_NOT_OK(this->LoadTensorRow(key, image_label_pairs_[key], &trow));
deq->push_back(std::move(trow));
}
(*db)->set_tensor_table(std::move(deq));
return Status::OK();
}
void MnistOp::Print(std::ostream &out, bool show_all) const {
if (!show_all) {
// Call the super class for displaying any common 1-liner info
@ -222,20 +111,6 @@ void MnistOp::Print(std::ostream &out, bool show_all) const {
}
}
// Reset Sampler and wakeup Master thread (functor)
Status MnistOp::Reset() {
MS_LOG(DEBUG) << Name() << " performing a self-reset.";
RETURN_IF_NOT_OK(sampler_->ResetSampler());
row_cnt_ = 0;
return Status::OK();
}
// hand shake with Sampler, allow Sampler to call RandomAccessOp's functions to get NumRows
Status MnistOp::InitSampler() {
RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this));
return Status::OK();
}
// Derived from RandomAccessOp
Status MnistOp::GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const {
if (cls_ids == nullptr || !cls_ids->empty() || image_label_pairs_.empty()) {

View File

@ -27,6 +27,7 @@
#include "minddata/dataset/engine/data_buffer.h"
#include "minddata/dataset/engine/data_schema.h"
#include "minddata/dataset/engine/datasetops/parallel_op.h"
#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h"
#include "minddata/dataset/engine/datasetops/source/sampler/sampler.h"
#include "minddata/dataset/util/path.h"
#include "minddata/dataset/util/queue.h"
@ -41,7 +42,7 @@ class Queue;
using MnistLabelPair = std::pair<std::shared_ptr<Tensor>, uint32_t>;
class MnistOp : public ParallelOp, public RandomAccessOp {
class MnistOp : public MappableLeafOp {
public:
class Builder {
public:
@ -131,17 +132,6 @@ class MnistOp : public ParallelOp, public RandomAccessOp {
// Destructor.
~MnistOp() = default;
// Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector
// @param int32_t worker_id - id of each worker
// @return Status The status code returned
Status WorkerEntry(int32_t worker_id) override;
// Main Loop of MnistOp
// Master thread: Fill IOBlockQueue, then goes to sleep
// Worker thread: pulls IOBlock from IOBlockQueue, work on it then put buffer to mOutConnector
// @return Status The status code returned
Status operator()() override;
// Method derived from RandomAccess Op, enable Sampler to get all ids for each class
// @param (std::map<uint64_t, std::vector<uint64_t >> * map - key label, val all ids for this class
// @return Status The status code returned
@ -163,27 +153,12 @@ class MnistOp : public ParallelOp, public RandomAccessOp {
std::string Name() const override { return "MnistOp"; }
private:
// Initialize Sampler, calls sampler->Init() within
// @return Status The status code returned
Status InitSampler();
// Load a tensor row according to a pair
// @param row_id_type row_id - id for this tensor row
// @param ImageLabelPair pair - <imagefile,label>
// @param TensorRow row - image & label read into this tensor row
// @return Status The status code returned
Status LoadTensorRow(row_id_type row_id, const MnistLabelPair &mnist_pair, TensorRow *row);
// @param const std::vector<int64_t> &keys - keys in ioblock
// @param std::unique_ptr<DataBuffer> db
// @return Status The status code returned
Status LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db);
// Iterate through all members in sampleIds and fill them into IOBlock.
// @param std::shared_ptr<Tensor> sample_ids -
// @param std::vector<int64_t> *keys - keys in ioblock
// @return Status The status code returned
Status TraversalSampleIds(const std::shared_ptr<Tensor> &sample_ids, std::vector<int64_t> *keys);
Status LoadTensorRow(row_id_type row_id, TensorRow *row) override;
// Check image file stream.
// @param const std::string *file_name - image file name
@ -226,20 +201,13 @@ class MnistOp : public ParallelOp, public RandomAccessOp {
// Called first when function is called
// @return Status The status code returned
Status LaunchThreadsAndInitOp();
// reset Op
// @return Status The status code returned
Status Reset() override;
Status LaunchThreadsAndInitOp() override;
// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;
int64_t buf_cnt_;
int64_t row_cnt_;
std::string folder_path_; // directory of image folder
int32_t rows_per_buffer_;
const std::string usage_; // can only be either "train" or "test"
std::unique_ptr<DataSchema> data_schema_;
std::vector<MnistLabelPair> image_label_pairs_;

View File

@ -1,5 +1,5 @@
/**
* Copyright 2019 Huawei Technologies Co., Ltd
* Copyright 2019-2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,7 +26,7 @@ Status RandomAccessOp::GetNumRowsInDataset(int64_t *num) const {
// after it has interacted with it's storage layers.
// Here, it is just a getter method to return the value. However, it is invalid if there is
// not a value set for this count, so generate a failure if that is the case.
if (num == nullptr || num_rows_ == 0) {
if (num == nullptr || num_rows_ == -1) {
RETURN_STATUS_UNEXPECTED("RandomAccessOp has not computed its num rows yet.");
}
(*num) = num_rows_;
@ -70,9 +70,6 @@ Status SamplerRT::HandshakeRandomAccessOp(const RandomAccessOp *op) {
}
Status SamplerRT::CreateSamplerTensor(std::shared_ptr<Tensor> *sample_ids, int64_t num_elements) {
if (num_elements == 0) {
RETURN_STATUS_UNEXPECTED("Invalid data, num of elements cannot be 0.");
}
if (col_desc_ == nullptr) {
// a ColDescriptor for Tensor that holds SampleIds
col_desc_ = std::make_unique<ColDescriptor>("sampleIds", DataType(DataType::DE_INT64), TensorImpl::kFlexible, 1);

View File

@ -1,5 +1,5 @@
/**
* Copyright 2019 Huawei Technologies Co., Ltd
* Copyright 2019-2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -70,7 +70,7 @@ Status SequentialSamplerRT::InitSampler() {
CHECK_FAIL_RETURN_UNEXPECTED(start_index_ >= 0,
"Invalid parameter, start_index must be greater than or equal to 0, but got " +
std::to_string(start_index_) + ".\n");
CHECK_FAIL_RETURN_UNEXPECTED(start_index_ < num_rows_,
CHECK_FAIL_RETURN_UNEXPECTED(start_index_ < num_rows_ || (num_rows_ == 0 && start_index_ == 0),
"Invalid parameter, start_index must be less than num_rows, but got start_index: " +
std::to_string(start_index_) + ", num_rows: " + std::to_string(num_rows_) + ".\n");
CHECK_FAIL_RETURN_UNEXPECTED(num_samples_ >= 0,
@ -83,7 +83,7 @@ Status SequentialSamplerRT::InitSampler() {
num_samples_ = available_row_count;
}
CHECK_FAIL_RETURN_UNEXPECTED(
num_samples_ > 0 && samples_per_buffer_ > 0,
(num_samples_ > 0 && samples_per_buffer_ > 0) || num_samples_ == 0,
"Invalid parameter, samples_per_buffer must be greater than 0, but got " + std::to_string(samples_per_buffer_));
samples_per_buffer_ = samples_per_buffer_ > num_samples_ ? num_samples_ : samples_per_buffer_;

View File

@ -99,83 +99,16 @@ VOCOp::VOCOp(const TaskType &task_type, const std::string &task_mode, const std:
const std::map<std::string, int32_t> &class_index, int32_t num_workers, int32_t rows_per_buffer,
int32_t queue_size, bool decode, std::unique_ptr<DataSchema> data_schema,
std::shared_ptr<SamplerRT> sampler)
: ParallelOp(num_workers, queue_size, std::move(sampler)),
: MappableLeafOp(num_workers, queue_size, std::move(sampler), rows_per_buffer),
decode_(decode),
row_cnt_(0),
buf_cnt_(0),
task_type_(task_type),
usage_(task_mode),
folder_path_(folder_path),
class_index_(class_index),
rows_per_buffer_(rows_per_buffer),
data_schema_(std::move(data_schema)) {
io_block_queues_.Init(num_workers_, queue_size);
}
Status VOCOp::TraverseSampleIds(const std::shared_ptr<Tensor> &sample_ids, std::vector<int64_t> *keys) {
for (auto itr = sample_ids->begin<int64_t>(); itr != sample_ids->end<int64_t>(); ++itr) {
if ((*itr) > num_rows_) continue;
keys->push_back(*itr);
row_cnt_++;
if (row_cnt_ % rows_per_buffer_ == 0) {
RETURN_IF_NOT_OK(io_block_queues_[buf_cnt_++ % num_workers_]->Add(
std::make_unique<IOBlock>(IOBlock(*keys, IOBlock::kDeIoBlockNone))));
keys->clear();
}
}
return Status::OK();
}
Status VOCOp::operator()() {
RETURN_IF_NOT_OK(LaunchThreadsAndInitOp());
std::unique_ptr<DataBuffer> sampler_buffer;
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
while (true) {
std::vector<int64_t> keys;
keys.reserve(rows_per_buffer_);
while (sampler_buffer->eoe() == false) {
std::shared_ptr<Tensor> sample_ids;
RETURN_IF_NOT_OK(sampler_buffer->GetTensor(&sample_ids, 0, 0));
if (sample_ids->type() != DataType(DataType::DE_INT64)) {
RETURN_STATUS_UNEXPECTED("Invalid parameter, data type of Sampler Tensor isn't int64, got " +
sample_ids->type().ToString());
}
RETURN_IF_NOT_OK(TraverseSampleIds(sample_ids, &keys));
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
}
if (keys.empty() == false) {
RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(
std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone))));
}
if (IsLastIteration()) {
std::unique_ptr<IOBlock> eoe_block = std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe);
std::unique_ptr<IOBlock> eof_block = std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEof);
RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eoe_block)));
RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eof_block)));
for (int32_t i = 0; i < num_workers_; i++) {
RETURN_IF_NOT_OK(
io_block_queues_[i]->Add(std::make_unique<IOBlock>(std::vector<int64_t>(), IOBlock::kDeIoBlockNone)));
}
return Status::OK();
} else {
RETURN_IF_NOT_OK(
io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
}
if (epoch_sync_flag_) {
// If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for
// the current epoch.
RETURN_IF_NOT_OK(WaitForWorkers());
}
// If not the last repeat, self-reset and go to loop again.
if (!IsLastIteration()) {
RETURN_IF_NOT_OK(Reset());
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
}
UpdateRepeatAndEpochCounter();
}
}
void VOCOp::Print(std::ostream &out, bool show_all) const {
if (!show_all) {
// Call the super class for displaying any common 1-liner info
@ -191,14 +124,8 @@ void VOCOp::Print(std::ostream &out, bool show_all) const {
}
}
Status VOCOp::Reset() {
MS_LOG(DEBUG) << Name() << " performing a self-reset.";
RETURN_IF_NOT_OK(sampler_->ResetSampler());
row_cnt_ = 0;
return Status::OK();
}
Status VOCOp::LoadTensorRow(row_id_type row_id, const std::string &image_id, TensorRow *trow) {
Status VOCOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) {
std::string image_id = image_ids_[row_id];
if (task_type_ == TaskType::Segmentation) {
std::shared_ptr<Tensor> image, target;
const std::string kImageFile =
@ -226,48 +153,6 @@ Status VOCOp::LoadTensorRow(row_id_type row_id, const std::string &image_id, Ten
return Status::OK();
}
Status VOCOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db) {
std::unique_ptr<TensorQTable> deq = std::make_unique<TensorQTable>();
TensorRow trow;
for (const uint64_t &key : keys) {
RETURN_IF_NOT_OK(this->LoadTensorRow(key, image_ids_[key], &trow));
deq->push_back(std::move(trow));
}
(*db)->set_tensor_table(std::move(deq));
return Status::OK();
}
Status VOCOp::WorkerEntry(int32_t worker_id) {
TaskManager::FindMe()->Post();
int64_t buffer_id = worker_id;
std::unique_ptr<IOBlock> io_block;
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
while (io_block != nullptr) {
if (io_block->wait() == true) {
// Sync io_block is a signal that master thread wants us to pause and sync with other workers.
// The last guy who comes to this sync point should reset the counter and wake up the master thread.
if (++num_workers_paused_ == num_workers_) {
wait_for_workers_post_.Set();
}
} else if (io_block->eoe() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE)));
buffer_id = worker_id;
} else if (io_block->eof() == true) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, (std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF))));
} else {
std::vector<int64_t> keys;
RETURN_IF_NOT_OK(io_block->GetKeys(&keys));
if (keys.empty() == true) return Status::OK();
std::unique_ptr<DataBuffer> db = std::make_unique<DataBuffer>(buffer_id, DataBuffer::kDeBFlagNone);
RETURN_IF_NOT_OK(LoadBuffer(keys, &db));
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db)));
buffer_id += num_workers_;
}
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
}
RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker");
}
Status VOCOp::ParseImageIds() {
std::string image_sets_file;
if (task_type_ == TaskType::Segmentation) {
@ -378,11 +263,6 @@ Status VOCOp::ParseAnnotationBbox(const std::string &path) {
return Status::OK();
}
Status VOCOp::InitSampler() {
RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this));
return Status::OK();
}
Status VOCOp::LaunchThreadsAndInitOp() {
if (tree_ == nullptr) {
RETURN_STATUS_UNEXPECTED("Pipeline init failed, Execution tree not set.");

View File

@ -27,6 +27,7 @@
#include "minddata/dataset/engine/data_buffer.h"
#include "minddata/dataset/engine/data_schema.h"
#include "minddata/dataset/engine/datasetops/parallel_op.h"
#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h"
#include "minddata/dataset/engine/datasetops/source/sampler/sampler.h"
#include "minddata/dataset/kernels/image/image_utils.h"
#include "minddata/dataset/util/path.h"
@ -45,7 +46,7 @@ class Queue;
using Annotation = std::vector<std::pair<std::string, std::vector<float>>>;
class VOCOp : public ParallelOp, public RandomAccessOp {
class VOCOp : public MappableLeafOp {
public:
enum class TaskType { Segmentation = 0, Detection = 1 };
@ -175,17 +176,6 @@ class VOCOp : public ParallelOp, public RandomAccessOp {
// Destructor
~VOCOp() = default;
// Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector
// @param int32_t workerId - id of each worker
// @return Status The status code returned
Status WorkerEntry(int32_t worker_id) override;
// Main Loop of VOCOp
// Master thread: Fill IOBlockQueue, then goes to sleep
// Worker thread: pulls IOBlock from IOBlockQueue, work on it the put buffer to mOutConnector
// @return Status The status code returned
Status operator()() override;
// A print method typically used for debugging
// @param out
// @param show_all
@ -219,16 +209,12 @@ class VOCOp : public ParallelOp, public RandomAccessOp {
Status GetClassIndexing(std::vector<std::pair<std::string, std::vector<int32_t>>> *output_class_indexing) override;
private:
// Initialize Sampler, calls sampler->Init() within
// @return Status The status code returned
Status InitSampler();
// Load a tensor row according to image id
// @param row_id_type row_id - id for this tensor row
// @param std::string image_id - image id
// @param TensorRow row - image & target read into this tensor row
// @return Status The status code returned
Status LoadTensorRow(row_id_type row_id, const std::string &image_id, TensorRow *row);
Status LoadTensorRow(row_id_type row_id, TensorRow *row) override;
// @param const std::string &path - path to the image file
// @param const ColDescriptor &col - contains tensor implementation and datatype
@ -241,11 +227,6 @@ class VOCOp : public ParallelOp, public RandomAccessOp {
// @return Status The status code returned
Status ReadAnnotationToTensor(const std::string &path, TensorRow *row);
// @param const std::vector<uint64_t> &keys - keys in ioblock
// @param std::unique_ptr<DataBuffer> db
// @return Status The status code returned
Status LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db);
// Read image list from ImageSets
// @return Status The status code returned
Status ParseImageIds();
@ -264,18 +245,9 @@ class VOCOp : public ParallelOp, public RandomAccessOp {
// @return Status The status code returned
void ParseNodeValue(XMLElement *bbox_node, const char *name, float *value);
// @param const std::shared_ptr<Tensor> &sample_ids - sample ids of tensor
// @param std::vector<int64_t> *keys - image id
// @return Status The status code returned
Status TraverseSampleIds(const std::shared_ptr<Tensor> &sample_ids, std::vector<int64_t> *keys);
// Called first when function is called
// @return Status The status code returned
Status LaunchThreadsAndInitOp();
// Reset dataset state
// @return Status The status code returned
Status Reset() override;
Status LaunchThreadsAndInitOp() override;
// Private function for computing the assignment of the column name map.
// @return - Status

View File

@ -154,6 +154,7 @@ if(BUILD_MINDDATA STREQUAL "full")
${MINDDATA_DIR}/engine/datasetops/map_op/cpu_map_job.cc
${MINDDATA_DIR}/engine/datasetops/source/album_op.cc
${MINDDATA_DIR}/engine/datasetops/source/mnist_op.cc
${MINDDATA_DIR}/engine/datasetops/source/mappable_leaf_op.cc
${MINDDATA_DIR}/engine/datasetops/source/io_block.cc
${MINDDATA_DIR}/engine/opt/pre/getter_pass.cc