!15979 Officially support caching over MindRecord dataset

From: @lixiachen
Reviewed-by: @nsyca,@robingrosman
Signed-off-by: @robingrosman
This commit is contained in:
mindspore-ci-bot 2021-05-07 03:39:12 +08:00 committed by Gitee
commit 34f4260cd1
28 changed files with 465 additions and 157 deletions

View File

@ -123,6 +123,7 @@ if(ENABLE_CACHE)
add_dependencies(cpp-API engine-cache-client)
add_dependencies(engine-ir-cache engine-cache-client)
add_dependencies(engine-ir-datasetops engine-cache-client)
add_dependencies(engine-ir-datasetops-source engine-cache-client)
add_dependencies(engine-opt engine-cache-client)
add_dependencies(engine-datasetops engine-cache-client)
add_dependencies(engine-perf engine-cache-client)

View File

@ -1001,32 +1001,33 @@ ManifestDataset::ManifestDataset(const std::vector<char> &dataset_file, const st
MindDataDataset::MindDataDataset(const std::vector<char> &dataset_file,
const std::vector<std::vector<char>> &columns_list,
const std::shared_ptr<Sampler> &sampler, const nlohmann::json *padded_sample,
int64_t num_padded) {
int64_t num_padded, const std::shared_ptr<DatasetCache> &cache) {
auto sampler_obj = sampler ? sampler->Parse() : nullptr;
nlohmann::json sample = nullptr;
if (padded_sample) {
sample = *padded_sample;
}
auto ds = std::make_shared<MindDataNode>(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj,
sample, num_padded);
sample, num_padded, cache);
ir_node_ = std::static_pointer_cast<DatasetNode>(ds);
}
MindDataDataset::MindDataDataset(const std::vector<char> &dataset_file,
const std::vector<std::vector<char>> &columns_list, const Sampler *sampler,
const nlohmann::json *padded_sample, int64_t num_padded) {
const nlohmann::json *padded_sample, int64_t num_padded,
const std::shared_ptr<DatasetCache> &cache) {
auto sampler_obj = sampler ? sampler->Parse() : nullptr;
nlohmann::json sample = nullptr;
if (padded_sample) {
sample = *padded_sample;
}
auto ds = std::make_shared<MindDataNode>(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj,
sample, num_padded);
sample, num_padded, cache);
ir_node_ = std::static_pointer_cast<DatasetNode>(ds);
}
MindDataDataset::MindDataDataset(const std::vector<char> &dataset_file,
const std::vector<std::vector<char>> &columns_list,
const std::reference_wrapper<Sampler> sampler, const nlohmann::json *padded_sample,
int64_t num_padded) {
int64_t num_padded, const std::shared_ptr<DatasetCache> &cache) {
auto sampler_obj = sampler.get().Parse();
nlohmann::json sample = nullptr;
if (padded_sample) {
@ -1034,13 +1035,13 @@ MindDataDataset::MindDataDataset(const std::vector<char> &dataset_file,
}
auto ds = std::make_shared<MindDataNode>(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj,
sample, num_padded);
sample, num_padded, cache);
ir_node_ = std::static_pointer_cast<DatasetNode>(ds);
}
MindDataDataset::MindDataDataset(const std::vector<std::vector<char>> &dataset_files,
const std::vector<std::vector<char>> &columns_list,
const std::shared_ptr<Sampler> &sampler, const nlohmann::json *padded_sample,
int64_t num_padded) {
int64_t num_padded, const std::shared_ptr<DatasetCache> &cache) {
auto sampler_obj = sampler ? sampler->Parse() : nullptr;
nlohmann::json sample = nullptr;
if (padded_sample) {
@ -1048,12 +1049,13 @@ MindDataDataset::MindDataDataset(const std::vector<std::vector<char>> &dataset_f
}
auto ds = std::make_shared<MindDataNode>(VectorCharToString(dataset_files), VectorCharToString(columns_list),
sampler_obj, sample, num_padded);
sampler_obj, sample, num_padded, cache);
ir_node_ = std::static_pointer_cast<DatasetNode>(ds);
}
MindDataDataset::MindDataDataset(const std::vector<std::vector<char>> &dataset_files,
const std::vector<std::vector<char>> &columns_list, const Sampler *sampler,
const nlohmann::json *padded_sample, int64_t num_padded) {
const nlohmann::json *padded_sample, int64_t num_padded,
const std::shared_ptr<DatasetCache> &cache) {
auto sampler_obj = sampler ? sampler->Parse() : nullptr;
nlohmann::json sample = nullptr;
if (padded_sample) {
@ -1061,20 +1063,20 @@ MindDataDataset::MindDataDataset(const std::vector<std::vector<char>> &dataset_f
}
auto ds = std::make_shared<MindDataNode>(VectorCharToString(dataset_files), VectorCharToString(columns_list),
sampler_obj, sample, num_padded);
sampler_obj, sample, num_padded, cache);
ir_node_ = std::static_pointer_cast<DatasetNode>(ds);
}
MindDataDataset::MindDataDataset(const std::vector<std::vector<char>> &dataset_files,
const std::vector<std::vector<char>> &columns_list,
const std::reference_wrapper<Sampler> sampler, const nlohmann::json *padded_sample,
int64_t num_padded) {
int64_t num_padded, const std::shared_ptr<DatasetCache> &cache) {
auto sampler_obj = sampler.get().Parse();
nlohmann::json sample = nullptr;
if (padded_sample) {
sample = *padded_sample;
}
auto ds = std::make_shared<MindDataNode>(VectorCharToString(dataset_files), VectorCharToString(columns_list),
sampler_obj, sample, num_padded);
sampler_obj, sample, num_padded, cache);
ir_node_ = std::static_pointer_cast<DatasetNode>(ds);
}
#endif

View File

@ -177,9 +177,9 @@ PYBIND_REGISTER(MindDataNode, 2, ([](const py::module *m) {
nlohmann::json padded_sample_json;
std::map<std::string, std::string> sample_bytes;
THROW_IF_ERROR(ToJson(padded_sample, &padded_sample_json, &sample_bytes));
auto minddata =
std::make_shared<MindDataNode>(dataset_file, toStringVector(columns_list),
toSamplerObj(sampler, true), padded_sample_json, num_padded);
auto minddata = std::make_shared<MindDataNode>(dataset_file, toStringVector(columns_list),
toSamplerObj(sampler, true), padded_sample_json,
num_padded, nullptr);
minddata->SetSampleBytes(&sample_bytes);
THROW_IF_ERROR(minddata->ValidateParams());
return minddata;
@ -189,9 +189,9 @@ PYBIND_REGISTER(MindDataNode, 2, ([](const py::module *m) {
nlohmann::json padded_sample_json;
std::map<std::string, std::string> sample_bytes;
THROW_IF_ERROR(ToJson(padded_sample, &padded_sample_json, &sample_bytes));
auto minddata =
std::make_shared<MindDataNode>(toStringVector(dataset_file), toStringVector(columns_list),
toSamplerObj(sampler, true), padded_sample_json, num_padded);
auto minddata = std::make_shared<MindDataNode>(
toStringVector(dataset_file), toStringVector(columns_list), toSamplerObj(sampler, true),
padded_sample_json, num_padded, nullptr);
minddata->SetSampleBytes(&sample_bytes);
THROW_IF_ERROR(minddata->ValidateParams());
return minddata;

View File

@ -40,7 +40,7 @@ using mindrecord::ShardOperator;
using mindrecord::ShardReader;
// Builder constructor. Creates the builder object.
MindRecordOp::Builder::Builder() : build_dataset_file_({}) {
MindRecordOp::Builder::Builder() : build_dataset_file_({}), builder_sampler_(nullptr) {
// Some arguments to the MindRecordOp constructor have a default argument that is taken
// from the client config.
// The user may choose to change these values for the construction of the MindRecordOp by
@ -69,11 +69,14 @@ Status MindRecordOp::Builder::Build(std::shared_ptr<MindRecordOp> *ptr) {
}
std::unique_ptr<ShardReader> shard_reader = std::make_unique<ShardReader>();
if (builder_sampler_ == nullptr) {
builder_sampler_ = std::make_shared<MindRecordSamplerRT>(shard_reader.get());
}
new_mind_record_op =
std::make_shared<MindRecordOp>(build_num_mind_record_workers_, build_dataset_file_, build_load_dataset_,
build_op_connector_queue_size_, build_columns_to_load_, build_operators_,
build_num_padded_, sample_json, build_sample_bytes_, std::move(shard_reader));
new_mind_record_op = std::make_shared<MindRecordOp>(
build_num_mind_record_workers_, build_dataset_file_, build_load_dataset_, build_op_connector_queue_size_,
build_columns_to_load_, build_operators_, build_num_padded_, sample_json, build_sample_bytes_,
std::move(shard_reader), builder_sampler_);
RETURN_IF_NOT_OK(new_mind_record_op->Init());
*ptr = std::move(new_mind_record_op);
@ -115,9 +118,8 @@ MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, std::vector<std::str
int32_t op_connector_queue_size, const std::vector<std::string> &columns_to_load,
const std::vector<std::shared_ptr<ShardOperator>> &operators, int64_t num_padded,
const mindrecord::json &sample_json, const std::map<std::string, std::string> &sample_bytes,
std::unique_ptr<ShardReader> shard_reader)
: MappableLeafOp(num_mind_record_workers, op_connector_queue_size,
std::make_shared<MindRecordSamplerRT>(shard_reader.get())),
std::unique_ptr<ShardReader> shard_reader, std::shared_ptr<SamplerRT> sampler)
: MappableLeafOp(num_mind_record_workers, op_connector_queue_size, std::move(sampler)),
dataset_file_(dataset_file),
load_dataset_(load_dataset),
columns_to_load_(columns_to_load),
@ -275,6 +277,7 @@ Status MindRecordOp::GetRowFromReader(TensorRow *fetched_row, int64_t row_id, in
RETURN_IF_NOT_OK(LoadTensorRow(fetched_row, {}, mindrecord::json(), task_type));
std::vector<std::string> file_path(fetched_row->size(), dataset_file_[0]);
fetched_row->setPath(file_path);
fetched_row->setId(row_id);
}
if (tupled_buffer.empty()) return Status::OK();
if (task_type == mindrecord::TaskType::kCommonTask) {
@ -284,6 +287,7 @@ Status MindRecordOp::GetRowFromReader(TensorRow *fetched_row, int64_t row_id, in
RETURN_IF_NOT_OK(LoadTensorRow(fetched_row, columns_blob, columns_json, task_type));
std::vector<std::string> file_path(fetched_row->size(), dataset_file_[0]);
fetched_row->setPath(file_path);
fetched_row->setId(row_id);
}
}

View File

@ -25,6 +25,7 @@
#include <tuple>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "minddata/dataset/engine/data_schema.h"
@ -107,6 +108,14 @@ class MindRecordOp : public MappableLeafOp {
return *this;
}
// Setter method
// @param std::shared_ptr<Sampler> sampler
// @return Builder setter method returns reference to the builder.
Builder &SetSampler(std::shared_ptr<SamplerRT> sampler) {
builder_sampler_ = std::move(sampler);
return *this;
}
Status SanityCheck() const;
static int32_t num_mind_record_workers() { return kDefaultMindRecordWorkers; }
@ -128,6 +137,7 @@ class MindRecordOp : public MappableLeafOp {
int64_t build_num_padded_;
py::handle build_sample_;
std::map<std::string, std::string> build_sample_bytes_;
std::shared_ptr<SamplerRT> builder_sampler_;
};
// Constructor of the MindRecordOp.
@ -137,11 +147,12 @@ class MindRecordOp : public MappableLeafOp {
// @param op_connector_queue_size - The output connector queue size
// @param columns_to_load - The list of columns to use (column name)
// @param operators - ShardOperators for Shuffle, Category, Sample
// @param sampler - sampler tells MindRecordOp what to read
MindRecordOp(int32_t num_mind_record_workers, std::vector<std::string> dataset_file, bool load_dataset,
int32_t op_connector_queue_size, const std::vector<std::string> &columns_to_load,
const std::vector<std::shared_ptr<ShardOperator>> &operators, int64_t num_padded_,
const mindrecord::json &sample_json, const std::map<std::string, std::string> &sample_bytes_,
std::unique_ptr<ShardReader> shard_reader);
std::unique_ptr<ShardReader> shard_reader, std::shared_ptr<SamplerRT> sampler);
// Destructor
~MindRecordOp() override;

View File

@ -62,6 +62,8 @@ Status MindRecordSamplerRT::InitSampler() {
Status MindRecordSamplerRT::ResetSampler() {
// drive the shard reader reshuffle tasks to redo the sampling for another epoch
// Note that when cache is attached, this function is driven by cache lookup op rather than mindrecord op.
// Therefore, the reshuffle of tasks might happen in the middle of mindrecord's epoch
next_id_ = 0;
shard_reader_->ShuffleTask();
return Status::OK();

View File

@ -76,6 +76,10 @@ class CacheLookupNode : public DatasetNode, public SamplerObj {
/// \return Status of the node visit
Status AcceptAfter(IRNodePass *const p, bool *const modified) override;
/// \brief Sampler getter
/// \return SamplerObj of the current node
std::shared_ptr<SamplerObj> Sampler() { return sampler_; }
private:
std::shared_ptr<SamplerObj> sampler_;
std::shared_ptr<DatasetOp> lookup_op_;

View File

@ -237,7 +237,7 @@ class DatasetNode : public std::enable_shared_from_this<DatasetNode> {
/// \return True if this node is not a data source node
const bool IsNotADataSource() const { return (mappable_ == kNotADataSource); }
/// \brief Check if this node is a descendant of an operator with cache. Currently used in leaf nodes
/// \brief Check if this node is a descendant of an operator with cache.
/// \return True if a cache-enabled operator is an ancestor of this node
const bool IsDescendantOfCache() const { return descendant_of_cache_; }
@ -247,7 +247,7 @@ class DatasetNode : public std::enable_shared_from_this<DatasetNode> {
return node != nullptr && node->parent_ == nullptr && node->Children().empty();
}
/// \brief Mark to indicate this node is a descendant of an operator with cache. Currently used in leaf nodes
/// \brief Mark to indicate this node is a descendant of an operator with cache.
void HasCacheAbove() { descendant_of_cache_ = true; }
/// \brief Getter of the number of workers
@ -335,7 +335,8 @@ class DatasetNode : public std::enable_shared_from_this<DatasetNode> {
enum DataSource { kNotADataSource = 0, kNonMappableSource = 1, kMappableSource = 2 };
enum DataSource mappable_;
bool nary_op_; // an indicator of whether the current node supports multiple children, true for concat/zip node
bool descendant_of_cache_;
bool descendant_of_cache_; // an indicator of whether the current node is a descendant of cache.
// Initially set to false, will set to true by the optimizer when conditions are met.
};
// MappableSourceNode represents the leaf nodes that can be randomly accessed with indexes.

View File

@ -38,8 +38,7 @@ MapNode::MapNode(std::shared_ptr<DatasetNode> child, std::vector<std::shared_ptr
output_columns_(output_columns),
project_columns_(project_columns),
DatasetNode(std::move(cache)),
callbacks_(callbacks),
under_a_cache_(false) {
callbacks_(callbacks) {
this->AddChild(child);
}
@ -68,9 +67,9 @@ Status MapNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_ops) {
// This is temporary code.
// Because the randomness of its tensor operations is not known in TensorOperation form until we convert them
// to TensorOp, we need to check the randomness here.
// When TensorOperation captures the randomness behaviour, remove this code and the member "under_a_cache_"
// When TensorOperation captures the randomness behaviour, remove this code
// and the temporary code in CacheValidation pre pass in IR optimizer.
if (under_a_cache_) {
if (IsDescendantOfCache()) {
auto itr = std::find_if(tensor_ops.begin(), tensor_ops.end(), [](const auto &it) { return !it->Deterministic(); });
if (itr != tensor_ops.end()) {
RETURN_STATUS_UNEXPECTED("MapNode containing random operation is not supported as a descendant of cache.");

View File

@ -79,9 +79,6 @@ class MapNode : public DatasetNode {
/// \brief setter to set all tensor operations
void setOperations(const std::vector<std::shared_ptr<TensorOperation>> &operations);
/// \brief indicate this Map will be cached
void Cached() { under_a_cache_ = true; }
/// \brief Getter functions
/// \brief Getter of tensor operations
/// \return Vector of operations the Map node will process
@ -102,7 +99,6 @@ class MapNode : public DatasetNode {
std::vector<std::string> output_columns_;
std::vector<std::string> project_columns_;
std::vector<std::shared_ptr<DSCallback>> callbacks_;
bool under_a_cache_;
};
} // namespace dataset

View File

@ -181,7 +181,11 @@ Status CLUENode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_ops)
RETURN_IF_NOT_OK(clue_op->Init());
if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal && !IsDescendantOfCache()) {
// If a global shuffle is used for Clue, it will inject a shuffle op over the Clue.
// But, if there is a cache in the tree, we do not need the global shuffle and the shuffle op should not be built.
// This is achieved in the cache transform pass where we call MakeSimpleProducer to reset Clue's shuffle
// option to false.
if (shuffle_ == ShuffleMode::kGlobal) {
// Inject ShuffleOp
std::shared_ptr<DatasetOp> shuffle_op = nullptr;
int64_t num_rows = 0;

View File

@ -119,7 +119,11 @@ Status CSVNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_ops) {
RETURN_IF_NOT_OK(csv_op->Init());
if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal && !IsDescendantOfCache()) {
// If a global shuffle is used for CSV, it will inject a shuffle op over the CSV.
// But, if there is a cache in the tree, we do not need the global shuffle and the shuffle op should not be built.
// This is achieved in the cache transform pass where we call MakeSimpleProducer to reset CSV's shuffle
// option to false.
if (shuffle_ == ShuffleMode::kGlobal) {
// Inject ShuffleOp
std::shared_ptr<DatasetOp> shuffle_op = nullptr;
int64_t num_rows = 0;

View File

@ -20,10 +20,13 @@
#include <memory>
#include <stack>
#include <string>
#include <utility>
#include <vector>
#include "minddata/dataset/engine/datasetops/source/mindrecord_op.h"
#include "minddata/dataset/engine/datasetops/source/sampler/mind_record_sampler.h"
#include "minddata/dataset/engine/ir/datasetops/cache_lookup_node.h"
#include "minddata/dataset/engine/ir/datasetops/source/samplers/mindrecord_sampler_ir.h"
#include "minddata/dataset/engine/opt/pass.h"
#include "minddata/dataset/util/status.h"
@ -31,36 +34,40 @@ namespace mindspore {
namespace dataset {
MindDataNode::MindDataNode(const std::vector<std::string> &dataset_files, const std::vector<std::string> &columns_list,
const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded)
: MappableSourceNode(),
const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded,
std::shared_ptr<DatasetCache> cache = nullptr)
: MappableSourceNode(std::move(cache)),
dataset_file_(std::string()),
dataset_files_(dataset_files),
search_for_pattern_(false),
columns_list_(columns_list),
sampler_(sampler),
input_sampler_(sampler),
sampler_(std::make_shared<MindRecordSamplerObj>()),
padded_sample_(padded_sample),
sample_bytes_({}),
num_padded_(num_padded) {}
MindDataNode::MindDataNode(const std::string &dataset_file, const std::vector<std::string> &columns_list,
const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded)
: MappableSourceNode(),
const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded,
std::shared_ptr<DatasetCache> cache = nullptr)
: MappableSourceNode(std::move(cache)),
dataset_file_(dataset_file),
dataset_files_({}),
search_for_pattern_(true),
columns_list_(columns_list),
sampler_(sampler),
input_sampler_(sampler),
sampler_(std::make_shared<MindRecordSamplerObj>()),
padded_sample_(padded_sample),
sample_bytes_({}),
num_padded_(num_padded) {}
std::shared_ptr<DatasetNode> MindDataNode::Copy() {
std::shared_ptr<MindDataNode> node;
std::shared_ptr<SamplerObj> sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy();
std::shared_ptr<SamplerObj> sampler = (input_sampler_ == nullptr) ? nullptr : input_sampler_->SamplerCopy();
if (dataset_files_.empty()) {
node = std::make_shared<MindDataNode>(dataset_file_, columns_list_, sampler, padded_sample_, num_padded_);
node = std::make_shared<MindDataNode>(dataset_file_, columns_list_, sampler, padded_sample_, num_padded_, cache_);
} else {
node = std::make_shared<MindDataNode>(dataset_files_, columns_list_, sampler, padded_sample_, num_padded_);
node = std::make_shared<MindDataNode>(dataset_files_, columns_list_, sampler, padded_sample_, num_padded_, cache_);
}
node->SetSampleBytes(&sample_bytes_);
return node;
@ -82,7 +89,7 @@ Status MindDataNode::ValidateParams() {
search_for_pattern_ ? std::vector<std::string>{dataset_file_} : dataset_files_;
RETURN_IF_NOT_OK(ValidateDatasetFilesParam("MindDataNode", dataset_file_vec));
RETURN_IF_NOT_OK(ValidateDatasetSampler("MindDataNode", sampler_));
RETURN_IF_NOT_OK(ValidateDatasetSampler("MindDataNode", input_sampler_));
if (!columns_list_.empty()) {
RETURN_IF_NOT_OK(ValidateDatasetColumnParam("MindDataNode", "columns_list", columns_list_));
@ -153,22 +160,46 @@ Status MindDataNode::BuildMindDatasetSamplerChain(const std::shared_ptr<SamplerO
void MindDataNode::SetSampleBytes(std::map<std::string, std::string> *sample_bytes) { sample_bytes_ = *sample_bytes; }
Status MindDataNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_ops) {
RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(sampler_, &operators_, num_padded_));
RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(input_sampler_, &operators_, num_padded_));
std::shared_ptr<SamplerRT> sampler_rt = nullptr;
// Build the sampler IR into a runtime sampler.
// This will also create a shard reader object, saved in this node's sampler_.
RETURN_IF_NOT_OK(sampler_->SamplerBuild(&sampler_rt));
// Now we need to acquire the newly created shard reader from this node's sampler_.
// There are two cases:
// 1. If this node is cached, now after cache transform pass, its sampler_ has already been replaced by cache lookup
// node, and we should find the shard reader from cache lookup node's sampler_.
// 2. If this node is not cached, just acquire the shard reader from this node's sampler_.
std::unique_ptr<ShardReader> shard_reader;
if (IsDescendantOfCache()) {
auto cache_lookup_sampler = std::dynamic_pointer_cast<CacheLookupNode>(sampler_);
CHECK_FAIL_RETURN_UNEXPECTED(cache_lookup_sampler != nullptr,
"Internal error. MindDataNode is cached, its sampler should be cache lookup node");
auto mr_sampler = std::dynamic_pointer_cast<MindRecordSamplerObj>(cache_lookup_sampler->Sampler());
CHECK_FAIL_RETURN_UNEXPECTED(mr_sampler != nullptr,
"Internal error. CacheLookupNode's sampler should be a MindRecordSamplerObj object");
RETURN_IF_NOT_OK(mr_sampler->GetShardReader(&shard_reader));
} else {
auto mr_sampler = std::dynamic_pointer_cast<MindRecordSamplerObj>(sampler_);
CHECK_FAIL_RETURN_UNEXPECTED(mr_sampler != nullptr,
"Internal error. MindDataNode's sampler should be a MindRecordSamplerObj object");
RETURN_IF_NOT_OK(mr_sampler->GetShardReader(&shard_reader));
}
std::shared_ptr<MindRecordOp> mindrecord_op;
std::unique_ptr<ShardReader> shard_reader = std::make_unique<ShardReader>();
// If pass a string to MindData(), it will be treated as a pattern to search for matched files,
// else if pass a vector to MindData(), it will be treated as specified files to be read
if (search_for_pattern_) {
std::vector<std::string> dataset_file_vec_ = {dataset_file_};
mindrecord_op = std::make_shared<MindRecordOp>(num_workers_, dataset_file_vec_, search_for_pattern_,
connector_que_size_, columns_list_, operators_, num_padded_,
padded_sample_, sample_bytes_, std::move(shard_reader));
mindrecord_op = std::make_shared<MindRecordOp>(
num_workers_, dataset_file_vec_, search_for_pattern_, connector_que_size_, columns_list_, operators_, num_padded_,
padded_sample_, sample_bytes_, std::move(shard_reader), std::move(sampler_rt));
} else {
mindrecord_op = std::make_shared<MindRecordOp>(num_workers_, dataset_files_, search_for_pattern_,
connector_que_size_, columns_list_, operators_, num_padded_,
padded_sample_, sample_bytes_, std::move(shard_reader));
mindrecord_op = std::make_shared<MindRecordOp>(
num_workers_, dataset_files_, search_for_pattern_, connector_que_size_, columns_list_, operators_, num_padded_,
padded_sample_, sample_bytes_, std::move(shard_reader), std::move(sampler_rt));
}
RETURN_IF_NOT_OK(mindrecord_op->Init());
@ -181,7 +212,7 @@ Status MindDataNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_o
// Get the shard id of node
Status MindDataNode::GetShardId(int32_t *shard_id) {
*shard_id = sampler_->ShardId();
*shard_id = input_sampler_->ShardId();
return Status::OK();
}
@ -195,7 +226,7 @@ Status MindDataNode::GetDatasetSize(const std::shared_ptr<DatasetSizeGetter> &si
}
int64_t num_rows = -1;
std::vector<std::shared_ptr<ShardOperator>> operators;
RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(sampler_, &operators, num_padded_));
RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(input_sampler_, &operators, num_padded_));
if (search_for_pattern_) {
dataset_files_ = {dataset_file_};

View File

@ -32,11 +32,13 @@ class MindDataNode : public MappableSourceNode {
public:
/// \brief Constructor
MindDataNode(const std::vector<std::string> &dataset_files, const std::vector<std::string> &columns_list,
const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded);
const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded,
std::shared_ptr<DatasetCache> cache);
/// \brief Constructor
MindDataNode(const std::string &dataset_file, const std::vector<std::string> &columns_list,
const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded);
const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded,
std::shared_ptr<DatasetCache> cache);
/// \brief Destructor
~MindDataNode() = default;
@ -109,7 +111,9 @@ class MindDataNode : public MappableSourceNode {
std::vector<std::string> dataset_files_; // search_for_pattern_ will be false in this mode
bool search_for_pattern_;
std::vector<std::string> columns_list_;
std::shared_ptr<SamplerObj> sampler_;
std::shared_ptr<SamplerObj> input_sampler_; // The sampler from users input, will be used to create a set of shard
// operators.
std::shared_ptr<SamplerObj> sampler_; // An auto-created sampler, IR of runtime MindRecordSamplerRT sampler
nlohmann::json padded_sample_;
std::map<std::string, std::string> sample_bytes_; // enable in python
int64_t num_padded_;

View File

@ -11,6 +11,7 @@ set(DATASET_ENGINE_IR_DATASETOPS_SOURCE_SAMPLERS_SRC_FILES
subset_random_sampler_ir.cc
subset_sampler_ir.cc
weighted_random_sampler_ir.cc
mindrecord_sampler_ir.cc
)
add_library(engine-ir-datasetops-source-samplers OBJECT ${DATASET_ENGINE_IR_DATASETOPS_SOURCE_SAMPLERS_SRC_FILES})

View File

@ -0,0 +1,56 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "minddata/dataset/engine/ir/datasetops/source/samplers/mindrecord_sampler_ir.h"
#include <memory>
#include <utility>
#ifndef ENABLE_ANDROID
#include "minddata/dataset/engine/datasetops/source/sampler/mind_record_sampler.h"
#include "minddata/mindrecord/include/shard_reader.h"
#endif
namespace mindspore {
namespace dataset {
#ifndef ENABLE_ANDROID
// This function not only creates a runtime sampler object, but also creates a ShardReader,
// which will also be needed to build a runtime MindRecordOp
// (cannot add another output parameter because it has to override base class's function)
Status MindRecordSamplerObj::SamplerBuild(std::shared_ptr<SamplerRT> *sampler) {
shard_reader_ = std::make_unique<mindrecord::ShardReader>();
*sampler = std::make_shared<MindRecordSamplerRT>(shard_reader_.get());
return Status::OK();
}
std::shared_ptr<SamplerObj> MindRecordSamplerObj::SamplerCopy() {
auto sampler = std::make_shared<MindRecordSamplerObj>();
return sampler;
}
// Function to acquire the unique pointer of the newly created ShardReader object
// Note this function can only be called after SamplerBuild is finished, and can only be called once. Otherwise this
// function will return error status.
Status MindRecordSamplerObj::GetShardReader(std::unique_ptr<mindrecord::ShardReader> *shard_reader) {
CHECK_FAIL_RETURN_UNEXPECTED(shard_reader_ != nullptr, "Internal error. Attempt to get an empty shard reader.");
*shard_reader = std::move(shard_reader_);
return Status::OK();
}
#endif
} // namespace dataset
} // namespace mindspore

View File

@ -0,0 +1,67 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_MINDRECORD_SAMPLER_IR_H
#define MINDSPORE_MINDRECORD_SAMPLER_IR_H
#include <memory>
#include "minddata/dataset/engine/ir/datasetops/source/samplers/samplers_ir.h"
#include "include/api/status.h"
#ifndef ENABLE_ANDROID
#include "minddata/mindrecord/include/shard_reader.h"
#endif
namespace mindspore {
namespace dataset {
#ifndef ENABLE_ANDROID
class MindRecordSamplerObj : public SamplerObj {
public:
/// \brief Constructor
MindRecordSamplerObj() : shard_reader_(nullptr) {}
/// \brief Destructor
~MindRecordSamplerObj() = default;
/// \brief Convert a MindRecordSamplerObj into a runtime MindRecordSamplerRT object
/// Note that this function not only creates a runtime sampler object, but also creates a ShardReader,
/// which will also be needed to build a runtime MindRecordOp
/// \param[out] sampler Shared pointer to the newly created runtime sampler
/// \return The Status code of the function. It returns OK status if sampler is created successfully.
Status SamplerBuild(std::shared_ptr<SamplerRT> *sampler) override;
/// \brief Function to copy a MindRecordSamplerObj
/// \return Shared pointer to the newly created SamplerObj
std::shared_ptr<SamplerObj> SamplerCopy() override;
/// \brief Function for parameter check. This class requires no input parameter.
/// \return The Status code of the function. This function always return OK status.
Status ValidateParams() override { return Status::OK(); }
/// \brief Function to acquire the unique pointer of the newly created ShardReader object
/// Note that this function can only be called after SamplerBuild is called, and can only be called once
/// \param shard_reader Unique pointer to the newly created ShardReader object
/// \return The Status code of the function. It returns OK status if acquired a non-empty ShardReader object.
Status GetShardReader(std::unique_ptr<mindrecord::ShardReader> *shard_reader);
private:
std::unique_ptr<mindrecord::ShardReader> shard_reader_;
};
#endif
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_MINDRECORD_SAMPLER_IR_H

View File

@ -87,7 +87,11 @@ Status TextFileNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_o
sorted_dataset_files, connector_que_size_, shuffle_files, num_shards_, shard_id_);
RETURN_IF_NOT_OK(text_file_op->Init());
if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal && !IsDescendantOfCache()) {
// If a global shuffle is used for TextFile, it will inject a shuffle op over the TextFile.
// But, if there is a cache in the tree, we do not need the global shuffle and the shuffle op should not be built.
// This is achieved in the cache transform pass where we call MakeSimpleProducer to reset TextFile's shuffle
// option to false.
if (shuffle_ == ShuffleMode::kGlobal) {
// Inject ShuffleOp
std::shared_ptr<DatasetOp> shuffle_op = nullptr;
int64_t num_rows = 0;

View File

@ -129,7 +129,11 @@ Status TFRecordNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_o
RETURN_IF_NOT_OK(tf_reader_op->Init());
if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal && !IsDescendantOfCache()) {
// If a global shuffle is used for TFRecord, it will inject a shuffle op over the TFRecord.
// But, if there is a cache in the tree, we do not need the global shuffle and the shuffle op should not be built.
// This is achieved in the cache transform pass where we call MakeSimpleProducer to reset TFRecord's shuffle
// option to false.
if (shuffle_ == ShuffleMode::kGlobal) {
// Inject ShuffleOp
std::shared_ptr<DatasetOp> shuffle_op = nullptr;

View File

@ -94,6 +94,9 @@ Status CacheTransformPass::CachePass::Visit(std::shared_ptr<NonMappableSourceNod
}
#endif
// Almost the same with NonMappableSourceNode's Visit, only this one is not guarded by the compiler
// directive #ifndef ENABLE_ANDROID, also and there is no need to call MakeSimpleProducer() because
// RandomOp doesn't support sampling or sharding
Status CacheTransformPass::CachePass::Visit(std::shared_ptr<RandomNode> node, bool *const modified) {
if (node->IsCached()) {
MS_LOG(INFO) << "Cache transform pass: CacheOp found, identified descendant tree.";
@ -137,11 +140,25 @@ Status CacheTransformPass::CachePass::Visit(std::shared_ptr<MappableSourceNode>
}
#ifndef ENABLE_ANDROID
// Perform leaf node cache transform identification
// Almost the same with MappableSourceNode's Visit, only in this one we also marked this node's descendant_of_cache_
// field to true. Later when building, MindDataNode will take different actions based on this information.
Status CacheTransformPass::CachePass::Visit(std::shared_ptr<MindDataNode> node, bool *const modified) {
if (node->IsCached() || is_caching_) {
return Status(StatusCode::kMDNotImplementedYet, __LINE__, __FILE__,
"There is currently no support for MindRecordOp under cache.");
if (node->IsCached()) {
MS_LOG(INFO) << "Cache transform pass: CacheOp found, identified descendant tree.";
is_caching_ = true;
}
// Cache might also be injected to the non-leaf node upper in the tree, so is_caching_ might also be set to true
// by the other Visit() with DatasetNode argument
if (is_caching_) {
node->HasCacheAbove();
MS_LOG(DEBUG) << "Cache transform pass: Mappable leaf in a cache descendant tree detected";
// If a leaf has already been assigned, then we have more than one leaf inside this cache descendant tree.
if (leaf_node_) {
return Status(StatusCode::kMDNotImplementedYet, __LINE__, __FILE__,
"There is currently no support for multiple leaf nodes under cache.");
}
// If we are a leaf in the caching path, then save this leaf
leaf_node_ = node;
}
return Status::OK();
}

View File

@ -121,7 +121,7 @@ Status CacheValidationPass::Visit(std::shared_ptr<MapNode> node, bool *const mod
// to TensorOp, we need to check the randomness in MapNode::Build().
// By setting this MapNode is under a cache, we will check the randomness of its tensor operations without the need
// to walk the IR tree again.
node->Cached();
node->HasCacheAbove();
auto tfuncs = node->TensorOperations();
for (size_t i = 0; i < tfuncs.size(); i++) {

View File

@ -1247,22 +1247,25 @@ class MindDataDataset : public Dataset {
public:
explicit MindDataDataset(const std::vector<char> &dataset_file, const std::vector<std::vector<char>> &columns_list,
const std::shared_ptr<Sampler> &sampler, const nlohmann::json *padded_sample,
int64_t num_padded);
int64_t num_padded, const std::shared_ptr<DatasetCache> &cache);
explicit MindDataDataset(const std::vector<char> &dataset_file, const std::vector<std::vector<char>> &columns_list,
const Sampler *sampler, const nlohmann::json *padded_sample, int64_t num_padded);
const Sampler *sampler, const nlohmann::json *padded_sample, int64_t num_padded,
const std::shared_ptr<DatasetCache> &cache);
explicit MindDataDataset(const std::vector<char> &dataset_file, const std::vector<std::vector<char>> &columns_list,
const std::reference_wrapper<Sampler> sampler, const nlohmann::json *padded_sample,
int64_t num_padded);
int64_t num_padded, const std::shared_ptr<DatasetCache> &cache);
explicit MindDataDataset(const std::vector<std::vector<char>> &dataset_files,
const std::vector<std::vector<char>> &columns_list, const std::shared_ptr<Sampler> &sampler,
const nlohmann::json *padded_sample, int64_t num_padded);
const nlohmann::json *padded_sample, int64_t num_padded,
const std::shared_ptr<DatasetCache> &cache);
explicit MindDataDataset(const std::vector<std::vector<char>> &dataset_files,
const std::vector<std::vector<char>> &columns_list, const Sampler *sampler,
const nlohmann::json *padded_sample, int64_t num_padded);
const nlohmann::json *padded_sample, int64_t num_padded,
const std::shared_ptr<DatasetCache> &cache);
explicit MindDataDataset(const std::vector<std::vector<char>> &dataset_files,
const std::vector<std::vector<char>> &columns_list,
const std::reference_wrapper<Sampler> sampler, const nlohmann::json *padded_sample,
int64_t num_padded);
int64_t num_padded, const std::shared_ptr<DatasetCache> &cache);
~MindDataDataset() = default;
};
@ -1276,13 +1279,14 @@ class MindDataDataset : public Dataset {
/// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler.
/// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list.
/// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards.
/// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used).
/// \return Shared pointer to the current MindDataDataset
inline std::shared_ptr<MindDataDataset> MindData(
const std::string &dataset_file, const std::vector<std::string> &columns_list = {},
const std::shared_ptr<Sampler> &sampler = std::make_shared<RandomSampler>(), nlohmann::json *padded_sample = nullptr,
int64_t num_padded = 0) {
int64_t num_padded = 0, const std::shared_ptr<DatasetCache> &cache = nullptr) {
return std::make_shared<MindDataDataset>(StringToChar(dataset_file), VectorStringToChar(columns_list), sampler,
padded_sample, num_padded);
padded_sample, num_padded, cache);
}
/// \brief Function to create a MindDataDataset
@ -1293,12 +1297,14 @@ inline std::shared_ptr<MindDataDataset> MindData(
/// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler.
/// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list.
/// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards.
/// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used).
/// \return Shared pointer to the current MindDataDataset
inline std::shared_ptr<MindDataDataset> MindData(const std::string &dataset_file,
const std::vector<std::string> &columns_list, const Sampler *sampler,
nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0) {
nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0,
const std::shared_ptr<DatasetCache> &cache = nullptr) {
return std::make_shared<MindDataDataset>(StringToChar(dataset_file), VectorStringToChar(columns_list), sampler,
padded_sample, num_padded);
padded_sample, num_padded, cache);
}
/// \brief Function to create a MindDataDataset
@ -1309,13 +1315,15 @@ inline std::shared_ptr<MindDataDataset> MindData(const std::string &dataset_file
/// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler.
/// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list.
/// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards.
/// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used).
/// \return Shared pointer to the current MindDataDataset
inline std::shared_ptr<MindDataDataset> MindData(const std::string &dataset_file,
const std::vector<std::string> &columns_list,
const std::reference_wrapper<Sampler> sampler,
nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0) {
nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0,
const std::shared_ptr<DatasetCache> &cache = nullptr) {
return std::make_shared<MindDataDataset>(StringToChar(dataset_file), VectorStringToChar(columns_list), sampler,
padded_sample, num_padded);
padded_sample, num_padded, cache);
}
/// \brief Function to create a MindDataDataset
@ -1327,13 +1335,14 @@ inline std::shared_ptr<MindDataDataset> MindData(const std::string &dataset_file
/// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler.
/// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list.
/// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards.
/// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used).
/// \return Shared pointer to the current MindDataDataset
inline std::shared_ptr<MindDataDataset> MindData(
const std::vector<std::string> &dataset_files, const std::vector<std::string> &columns_list = {},
const std::shared_ptr<Sampler> &sampler = std::make_shared<RandomSampler>(), nlohmann::json *padded_sample = nullptr,
int64_t num_padded = 0) {
int64_t num_padded = 0, const std::shared_ptr<DatasetCache> &cache = nullptr) {
return std::make_shared<MindDataDataset>(VectorStringToChar(dataset_files), VectorStringToChar(columns_list), sampler,
padded_sample, num_padded);
padded_sample, num_padded, cache);
}
/// \brief Function to create a MindDataDataset
@ -1343,12 +1352,14 @@ inline std::shared_ptr<MindDataDataset> MindData(
/// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler.
/// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list.
/// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards.
/// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used).
/// \return Shared pointer to the current MindDataDataset
inline std::shared_ptr<MindDataDataset> MindData(const std::vector<std::string> &dataset_files,
const std::vector<std::string> &columns_list, const Sampler *sampler,
nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0) {
nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0,
const std::shared_ptr<DatasetCache> &cache = nullptr) {
return std::make_shared<MindDataDataset>(VectorStringToChar(dataset_files), VectorStringToChar(columns_list), sampler,
padded_sample, num_padded);
padded_sample, num_padded, cache);
}
/// \brief Function to create a MindDataDataset
@ -1358,13 +1369,15 @@ inline std::shared_ptr<MindDataDataset> MindData(const std::vector<std::string>
/// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler.
/// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list.
/// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards.
/// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used).
/// \return Shared pointer to the current MindDataDataset
inline std::shared_ptr<MindDataDataset> MindData(const std::vector<std::string> &dataset_files,
const std::vector<std::string> &columns_list,
const std::reference_wrapper<Sampler> sampler,
nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0) {
nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0,
const std::shared_ptr<DatasetCache> &cache = nullptr) {
return std::make_shared<MindDataDataset>(VectorStringToChar(dataset_files), VectorStringToChar(columns_list), sampler,
padded_sample, num_padded);
padded_sample, num_padded, cache);
}
class MnistDataset : public Dataset {

View File

@ -63,10 +63,12 @@ void ShardTaskList::TaskListSwap(ShardTaskList &orig_tasks, ShardTaskList &new_t
// When swapping, if the orig_tasks contains fields that need to be preserved after the swap, then swapping with a
// new_tasks that does not have those fields will result in clobbering/losing the data after the swap.
// The task_list_ should not be lost/clobbered.
new_tasks.task_list_ = std::move(orig_tasks.task_list_);
// This function can be called in the middle of mindrecord's epoch, when orig_tasks.task_list_ is still being
// used by mindrecord op's worker threads. So don't touch its task_list_ since this field should be preserved anyways.
// Now, it's safe to drive the swap.
std::swap(orig_tasks, new_tasks);
std::swap(orig_tasks.categories, new_tasks.categories);
std::swap(orig_tasks.permutation_, new_tasks.permutation_);
std::swap(orig_tasks.sample_ids_, new_tasks.sample_ids_);
}
void ShardTaskList::PopBack() { task_list_.pop_back(); }

View File

@ -3052,6 +3052,8 @@ class MindDataset(MappableDataset):
plus num_padded should be divisible by num_shards.
num_samples (int, optional): The number of samples to be included in the dataset
(default=None, all samples).
cache (DatasetCache, optional): Use tensor caching service to speed up dataset processing.
(default=None, which means no cache is used).
Raises:
ValueError: If num_shards is specified but shard_id is None.
@ -3068,9 +3070,9 @@ class MindDataset(MappableDataset):
@check_minddataset
def __init__(self, dataset_file, columns_list=None, num_parallel_workers=None, shuffle=None, num_shards=None,
shard_id=None, sampler=None, padded_sample=None, num_padded=None, num_samples=None):
shard_id=None, sampler=None, padded_sample=None, num_padded=None, num_samples=None, cache=None):
super().__init__(num_parallel_workers=num_parallel_workers, sampler=sampler, num_samples=num_samples,
shuffle=shuffle, num_shards=num_shards, shard_id=shard_id)
shuffle=shuffle, num_shards=num_shards, shard_id=shard_id, cache=cache)
if isinstance(dataset_file, list):
self.load_dataset = False
else:

View File

@ -32,7 +32,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCApiSamplerNull) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true, "127.0.0.1", 50053, 1, 1);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false, "127.0.0.1", 50053, 1, 1);
EXPECT_NE(some_cache, nullptr);
// Create an ImageFolder Dataset, this folder_path only has 2 images in it
@ -52,7 +52,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCApiNestedCache) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create an ImageFolder Dataset, this folder_path only has 2 images in it
@ -80,7 +80,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheImageFolderCApi) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create an ImageFolder Dataset, this folder_path only has 2 images in it
@ -121,7 +121,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCocoCApi) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create a Coco Dataset, this folder_path has 6 images in it
@ -164,7 +164,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheMnistCApi) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create a Mnist Dataset
@ -205,7 +205,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCelebaCApi) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create a CelebA Dataset, this folder_path has 4 records in it
@ -247,7 +247,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheManifestCApi) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create a Manifest Dataset, this file_path has 2 records in it
@ -288,7 +288,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCifar10CApi) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create a Cifar10 Dataset
@ -329,7 +329,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCifar100CApi) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create a Cifar100 Dataset
@ -370,7 +370,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheVocCApi) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create a VOC Dataset, this folder_path has 9 records in it
@ -412,7 +412,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheAlbumCApi) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
std::string folder_path = datasets_root_path_ + "/testAlbum/images";
@ -449,12 +449,50 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheAlbumCApi) {
iter->Stop();
}
TEST_F(MindDataTestCacheOp, DISABLED_TestCacheMindRecordCApi) {
session_id_type env_session;
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create a MindData Dataset
// Pass one mindrecord shard file to parse dataset info, and search for other mindrecord files with same dataset info,
// thus all records in imagenet.mindrecord0 ~ imagenet.mindrecord3 will be read
std::string file_path = datasets_root_path_ + "/../mindrecord/testMindDataSet/testImageNetData/imagenet.mindrecord0";
// Create a MindRecord Dataset, 20 records in it
std::shared_ptr<Dataset> ds = MindData(file_path, {}, std::make_shared<RandomSampler>(), nullptr, 0, some_cache);
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
std::shared_ptr<Iterator> iter = ds->CreateIterator();
EXPECT_NE(iter, nullptr);
// Iterate the dataset and get each row
std::unordered_map<std::string, mindspore::MSTensor> row;
ASSERT_OK(iter->GetNextRow(&row));
uint64_t i = 0;
while (row.size() != 0) {
i++;
ASSERT_OK(iter->GetNextRow(&row));
}
EXPECT_EQ(i, 20);
// Manually terminate the pipeline
iter->Stop();
}
TEST_F(MindDataTestCacheOp, DISABLED_TestCacheRandomDataCApi) {
session_id_type env_session;
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create a RandomDataset
@ -496,7 +534,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheTFRecordCApi1) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create a TFRecord Dataset, this file_path has 3 records in it
@ -539,7 +577,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheTFRecordCApi2) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create a TFRecord Dataset, this file_path has 3 records in it
@ -590,7 +628,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheTFRecordCApi3) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create a TFRecord Dataset, this file_path has 3 records in it
@ -637,7 +675,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheTextfileCApi) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create a TextFile Dataset, this file_path has 3 records in it
@ -680,7 +718,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCsvCApi) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create a CSV Dataset, this file_path has 3 records in it
@ -724,7 +762,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheClueCApi) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create a CLUE Dataset, this file_path has 3 records in it
@ -769,7 +807,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShare1) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create an ImageFolder Dataset, this folder_path only has 2 images in it
@ -821,7 +859,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShare2) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create an ImageFolder Dataset, this folder_path only has 2 images in it
@ -874,7 +912,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShareFailure1) {
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false);
EXPECT_NE(some_cache, nullptr);
// Create an ImageFolder Dataset, this folder_path only has 2 images in it

View File

@ -29,6 +29,11 @@ UT_TEST_DIR="${BUILD_PATH}/mindspore/tests/ut/cpp"
DateStamp=$(date +%Y%m%d_%H%M%S);
CPP_TEST_LOG_OUTPUT="/tmp/ut_tests_cache_${DateStamp}.log"
## prepare data for dataset & mindrecord
cp -fr ${PROJECT_PATH}/tests/ut/data ${UT_TEST_DIR}
## prepare album dataset, uses absolute path so has to be generated
python ${UT_TEST_DIR}/data/dataset/testAlbum/gen_json.py
# start cache server with a spilling path to be used for all tests
cmd="${CACHE_ADMIN} --start -s /tmp"
CacheAdminCmd "${cmd}" 0

View File

@ -121,6 +121,9 @@ HandleRcExit $? 0 0
PytestCmd "test_cache_map.py" "test_cache_map_voc" 1
HandleRcExit $? 0 0
PytestCmd "test_cache_map.py" "test_cache_map_mindrecord" 1
HandleRcExit $? 0 0
PytestCmd "test_cache_map.py" "test_cache_map_python_sampler" 1
HandleRcExit $? 0 0

View File

@ -413,47 +413,6 @@ def test_cache_map_failure5():
logger.info('test_cache_failure5 Ended.\n')
@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
def test_cache_map_failure6():
"""
Test no-cache-supporting MindRecord leaf with Map under cache (failure)
repeat
|
Cache
|
Map(resize)
|
MindRecord
"""
logger.info("Test cache failure 6")
if "SESSION_ID" in os.environ:
session_id = int(os.environ['SESSION_ID'])
else:
raise RuntimeError("Testcase requires SESSION_ID environment variable")
some_cache = ds.DatasetCache(session_id=session_id, size=0)
columns_list = ["id", "file_name", "label_name", "img_data", "label_data"]
num_readers = 1
# The dataset has 5 records
data = ds.MindDataset(MIND_RECORD_DATA_DIR, columns_list, num_readers)
resize_op = c_vision.Resize((224, 224))
data = data.map(input_columns=["img_data"], operations=resize_op, cache=some_cache)
data = data.repeat(4)
with pytest.raises(RuntimeError) as e:
num_iter = 0
for _ in data.create_dict_iterator():
num_iter += 1
assert "There is currently no support for MindRecordOp under cache" in str(e.value)
assert num_iter == 0
logger.info('test_cache_failure6 Ended.\n')
@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
def test_cache_map_failure7():
"""
@ -1997,6 +1956,79 @@ class ReverseSampler(ds.Sampler):
yield i
@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
def test_cache_map_mindrecord1():
"""
Test mappable mindrecord leaf with cache op right over the leaf
cache
|
MindRecord
"""
logger.info("Test cache map mindrecord1")
if "SESSION_ID" in os.environ:
session_id = int(os.environ['SESSION_ID'])
else:
session_id = 1
some_cache = ds.DatasetCache(session_id=session_id, size=0)
# This dataset has 5 records
columns_list = ["id", "file_name", "label_name", "img_data", "label_data"]
ds1 = ds.MindDataset(MIND_RECORD_DATA_DIR, columns_list, cache=some_cache)
num_epoch = 4
iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True)
epoch_count = 0
for _ in range(num_epoch):
assert sum([1 for _ in iter1]) == 5
epoch_count += 1
assert epoch_count == num_epoch
logger.info("test_cache_map_mindrecord1 Ended.\n")
@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
def test_cache_map_mindrecord2():
"""
Test mappable mindrecord leaf with the cache op later in the tree above the map(decode)
cache
|
Map(decode)
|
MindRecord
"""
logger.info("Test cache map mindrecord2")
if "SESSION_ID" in os.environ:
session_id = int(os.environ['SESSION_ID'])
else:
session_id = 1
some_cache = ds.DatasetCache(session_id=session_id, size=0)
# This dataset has 5 records
columns_list = ["id", "file_name", "label_name", "img_data", "label_data"]
ds1 = ds.MindDataset(MIND_RECORD_DATA_DIR, columns_list)
decode_op = c_vision.Decode()
ds1 = ds1.map(input_columns=["img_data"], operations=decode_op, cache=some_cache)
num_epoch = 4
iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True)
epoch_count = 0
for _ in range(num_epoch):
assert sum([1 for _ in iter1]) == 5
epoch_count += 1
assert epoch_count == num_epoch
logger.info("test_cache_map_mindrecord2 Ended.\n")
@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
def test_cache_map_python_sampler1():
"""
@ -2169,7 +2201,6 @@ if __name__ == '__main__':
test_cache_map_failure3()
test_cache_map_failure4()
test_cache_map_failure5()
test_cache_map_failure6()
test_cache_map_failure7()
test_cache_map_failure8()
test_cache_map_failure9()
@ -2210,6 +2241,8 @@ if __name__ == '__main__':
test_cache_map_cifar4()
test_cache_map_voc1()
test_cache_map_voc2()
test_cache_map_mindrecord1()
test_cache_map_mindrecord2()
test_cache_map_python_sampler1()
test_cache_map_python_sampler2()
test_cache_map_nested_repeat()