forked from mindspore-Ecosystem/mindspore
Stage 2
This commit is contained in:
parent
55e165c50e
commit
adb9803594
|
@ -237,10 +237,6 @@ class BatchOp : public ParallelOp<std::pair<std::unique_ptr<TensorQTable>, CBatc
|
|||
std::set<int32_t> *pad_cols, std::vector<std::shared_ptr<Tensor>> *pad_vals,
|
||||
std::vector<std::vector<dsize_t>> *pad_shapes);
|
||||
|
||||
// the number of thread pulling from the mOutConnector of the Op below
|
||||
// @return int32_t, 1
|
||||
int32_t NumConsumers() const override { return 1; }
|
||||
|
||||
// get the batch size for next batch
|
||||
// @return Status The status code returned
|
||||
Status GetBatchSize(int32_t *batch_size, CBatchInfo info);
|
||||
|
|
|
@ -69,14 +69,6 @@ class BuildSentencePieceVocabOp : public PipelineOp {
|
|||
|
||||
Status operator()() override;
|
||||
|
||||
// Getter
|
||||
// @return the number of workers
|
||||
int32_t NumProducers() const override { return 1; }
|
||||
|
||||
// Getter
|
||||
// @return the number of threads consuming from the previous Connector
|
||||
int32_t NumConsumers() const override { return 1; }
|
||||
|
||||
Status Reset() override { RETURN_STATUS_UNEXPECTED("Reset shouldn't be called in BuildSentencePieceVocabOp"); }
|
||||
|
||||
std::string Name() const override { return kBuildSentencePieceVocabOp; }
|
||||
|
|
|
@ -66,14 +66,6 @@ class BuildVocabOp : public ParallelOp<TensorRow, TensorRow> {
|
|||
|
||||
Status operator()() override;
|
||||
|
||||
/// Getter
|
||||
/// @return the number of workers
|
||||
int32_t NumProducers() const override { return 1; }
|
||||
|
||||
/// Getter
|
||||
/// @return the number of threads consuming from the previous Connector
|
||||
int32_t NumConsumers() const override { return 1; }
|
||||
|
||||
Status Reset() override { RETURN_STATUS_UNEXPECTED("Reset shouldn't be called in BuildVocabOp"); }
|
||||
|
||||
private:
|
||||
|
|
|
@ -193,26 +193,5 @@ Status ConcatOp::GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe
|
|||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int32_t ConcatOp::NumConsumers() const {
|
||||
if (parent_.empty()) {
|
||||
MS_LOG(DEBUG) << "Return operator, no parent node, assuming it's the root and returning 1.";
|
||||
return 1;
|
||||
} else if (parent_[0] == nullptr) {
|
||||
MS_LOG(DEBUG) << "Return operator, pointer to the first parent is null. Returning 0.";
|
||||
return 0;
|
||||
} else {
|
||||
return parent_[0]->NumConsumers();
|
||||
}
|
||||
}
|
||||
|
||||
int32_t ConcatOp::NumProducers() const {
|
||||
if (child_.empty() || child_[0] == nullptr) {
|
||||
MS_LOG(DEBUG) << "Return operator, pointer to child node is null. Returning 0.";
|
||||
return 0;
|
||||
} else {
|
||||
return child_[0]->NumProducers();
|
||||
}
|
||||
}
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
|
|
|
@ -73,8 +73,6 @@ class ConcatOp : public PipelineOp {
|
|||
Status GetNumClasses(int64_t *num_classes) override;
|
||||
|
||||
Status GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) override;
|
||||
int32_t NumConsumers() const override;
|
||||
int32_t NumProducers() const override;
|
||||
|
||||
/// Check if the current sample will be taken or dropped
|
||||
/// \return bool
|
||||
|
|
|
@ -209,14 +209,6 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
|
|||
// \return The number of workers in this op
|
||||
virtual int32_t NumWorkers() const = 0;
|
||||
|
||||
// \brief Getter function
|
||||
// \return The number of threads consuming from previous op.
|
||||
virtual int32_t NumConsumers() const = 0;
|
||||
|
||||
// \brief Getter function
|
||||
// \return The number of threads producing to the output connector.
|
||||
virtual int32_t NumProducers() const = 0;
|
||||
|
||||
// \brief Getter function
|
||||
// \return T/F if this is an inlined operator
|
||||
bool inlined() const { return (oc_queue_size_ == 0); }
|
||||
|
|
|
@ -159,7 +159,5 @@ Status FilterOp::InvokePredicateFunc(const TensorRow &input, bool *out_predicate
|
|||
|
||||
return Status(StatusCode::kSuccess, "FilterOp predicate func call succeed");
|
||||
}
|
||||
int32_t FilterOp::NumConsumers() const { return 1; }
|
||||
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
|
|
|
@ -69,8 +69,6 @@ class FilterOp : public ParallelOp<TensorRow, TensorRow> {
|
|||
// @return Name of the current Op
|
||||
std::string Name() const override { return kFilterOp; }
|
||||
|
||||
int32_t NumConsumers() const override;
|
||||
|
||||
private:
|
||||
// predicate_func python callable which returns a boolean value.
|
||||
std::shared_ptr<TensorOp> predicate_func_;
|
||||
|
|
|
@ -45,12 +45,6 @@ MapOp::MapOp(const std::vector<std::string> &in_col_names, const std::vector<std
|
|||
}
|
||||
}
|
||||
|
||||
// The number of threads consuming data from previous op's output Connector.
|
||||
int32_t MapOp::NumConsumers() const {
|
||||
// When Performance Mode is on, there is only one thread consuming from the previous Connector.
|
||||
return 1;
|
||||
}
|
||||
|
||||
// A print method typically used for debugging
|
||||
void MapOp::Print(std::ostream &out, bool show_all) const {
|
||||
if (!show_all) {
|
||||
|
|
|
@ -107,10 +107,6 @@ class MapOp : public ParallelOp<std::unique_ptr<MapWorkerJob>, TensorRow> {
|
|||
// @return Status The status code returned
|
||||
Status operator()() override;
|
||||
|
||||
// Getter
|
||||
// @return the number of threads consuming data from previous op's output Connector.
|
||||
int32_t NumConsumers() const override;
|
||||
|
||||
// Op name getter
|
||||
// @return Name of the current Op
|
||||
std::string Name() const override { return kMapOp; }
|
||||
|
|
|
@ -79,18 +79,6 @@ class ParallelOp : public DatasetOp {
|
|||
|
||||
int32_t NumWorkers() const override { return num_workers_; }
|
||||
|
||||
// Getter
|
||||
// @return the number of threads consuming from the previous Connector
|
||||
int32_t NumConsumers() const override { return num_workers_; }
|
||||
|
||||
// Getter
|
||||
// @return the number of producers pushing to the output Connector
|
||||
// @notes The number of producers is commonly the same as number of workers, except in the case
|
||||
// when a worker connector is set up. In that case, there are n workers, and a single master
|
||||
// such that only 1 thread is a producer rather than the n workers.
|
||||
// @return the number of producers
|
||||
int32_t NumProducers() const override { return num_producers_; }
|
||||
|
||||
protected:
|
||||
/// Interface for derived classes to implement. All derived classes must provide the entry
|
||||
/// function with the main execution loop for worker threads.
|
||||
|
@ -140,7 +128,6 @@ class ParallelOp : public DatasetOp {
|
|||
|
||||
/// The number of worker threads
|
||||
int32_t num_workers_;
|
||||
int32_t num_producers_; // The number of threads pushing to the out_connector_
|
||||
/// The size of input/output worker queeus
|
||||
int32_t worker_connector_size_;
|
||||
/// queues to hold the input rows to workers
|
||||
|
|
|
@ -57,14 +57,6 @@ class PipelineOp : public DatasetOp {
|
|||
// @return The number of workers inside this op. Pipeline ops only have a single worker.
|
||||
int32_t NumWorkers() const override { return 1; }
|
||||
|
||||
// Getter
|
||||
// @return the number of threads consuming from the previous Connector
|
||||
int32_t NumConsumers() const override { return 1; }
|
||||
|
||||
// Getter
|
||||
// @return The number of threads that push data to the output connector
|
||||
int32_t NumProducers() const override { return 1; }
|
||||
|
||||
protected:
|
||||
// *******************************************************************************
|
||||
// I'm predicting there will be common arguments or functionality for pipeline ops,
|
||||
|
|
|
@ -76,27 +76,6 @@ TensorRow ProjectOp::Project(const TensorRow &row) {
|
|||
// ensure that it is not called by mistake (it will generate an error).
|
||||
Status ProjectOp::operator()() { RETURN_STATUS_UNEXPECTED("Logic error. ProjectOp is an inlined operator."); }
|
||||
|
||||
int32_t ProjectOp::NumConsumers() const {
|
||||
if (parent_.empty()) {
|
||||
MS_LOG(DEBUG) << "Project operator, no parent node, assuming it's the root and returning 1.";
|
||||
return 1;
|
||||
} else if (parent_[0] == nullptr) {
|
||||
MS_LOG(DEBUG) << "Project operator, pointer to the first parent is null. Returning 0.";
|
||||
return 0;
|
||||
} else {
|
||||
return parent_[0]->NumConsumers();
|
||||
}
|
||||
}
|
||||
|
||||
int32_t ProjectOp::NumProducers() const {
|
||||
if (child_.empty() || child_[0] == nullptr) {
|
||||
MS_LOG(DEBUG) << "Project operator, pointer to child node is null. Returning 0.";
|
||||
return 0;
|
||||
} else {
|
||||
return child_[0]->NumProducers();
|
||||
}
|
||||
}
|
||||
|
||||
Status ProjectOp::EoeReceived(int32_t worker_id) {
|
||||
state_ = OpState::kDeOpIdle;
|
||||
return Status::OK();
|
||||
|
|
|
@ -61,14 +61,6 @@ class ProjectOp : public PipelineOp {
|
|||
// @param worker_id - The worker id
|
||||
Status GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) override;
|
||||
|
||||
// Base-class override. Return the number of workers in the first parent.
|
||||
// @param workerId - The worker id
|
||||
int32_t NumConsumers() const override;
|
||||
|
||||
// Base-class override. Return the number of producers in the first child.
|
||||
// @param workerId - The worker id
|
||||
int32_t NumProducers() const override;
|
||||
|
||||
// Base-class override for special eoe handler.
|
||||
// Inline operators must override this because there is no connector to push eoe onto.
|
||||
// @return Status The status code returned
|
||||
|
|
|
@ -129,26 +129,5 @@ void RenameOp::Print(std::ostream &out, // In: The output stream to print t
|
|||
out << "\n\n";
|
||||
}
|
||||
}
|
||||
|
||||
int32_t RenameOp::NumConsumers() const {
|
||||
if (parent_.empty()) {
|
||||
MS_LOG(DEBUG) << "Rename operator, no parent node, assuming it's the root and returning 1.";
|
||||
return 1;
|
||||
} else if (parent_[0] == nullptr) {
|
||||
MS_LOG(DEBUG) << "Rename operator, pointer to the first parent is null. Returning 0.";
|
||||
return 0;
|
||||
} else {
|
||||
return parent_[0]->NumConsumers();
|
||||
}
|
||||
}
|
||||
|
||||
int32_t RenameOp::NumProducers() const {
|
||||
if (child_.empty() || child_[0] == nullptr) {
|
||||
MS_LOG(DEBUG) << "Rename operator, pointer to child node is null. Returning 0.";
|
||||
return 0;
|
||||
} else {
|
||||
return child_[0]->NumProducers();
|
||||
}
|
||||
}
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
|
|
|
@ -63,8 +63,6 @@ class RenameOp : public PipelineOp {
|
|||
// @param row - output pointer to the projected row.
|
||||
// @param worker_id - The worker id
|
||||
Status GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) override;
|
||||
int32_t NumConsumers() const override;
|
||||
int32_t NumProducers() const override;
|
||||
|
||||
protected:
|
||||
// Rename core functionality
|
||||
|
|
|
@ -115,18 +115,6 @@ Status RepeatOp::EofReceived(int32_t worker_id) {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
int32_t RepeatOp::NumConsumers() const {
|
||||
if (parent_.empty()) {
|
||||
MS_LOG(DEBUG) << "Repeat operator, no parent node, assuming it's root and returning 1.";
|
||||
return 1;
|
||||
} else if (parent_[0] == nullptr) {
|
||||
MS_LOG(DEBUG) << "Repeat operator, pointer to the first parent is null. Returning 0.";
|
||||
return 0;
|
||||
} else {
|
||||
return parent_[0]->NumConsumers();
|
||||
}
|
||||
}
|
||||
|
||||
// Drive reset actions if needed
|
||||
Status RepeatOp::Reset() {
|
||||
// If there's nested repeats, an ascendant repeat may have ourself listed as an eoe op.
|
||||
|
@ -140,15 +128,6 @@ Status RepeatOp::Reset() {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
int32_t RepeatOp::NumProducers() const {
|
||||
if (child_.empty() || child_[0] == nullptr) {
|
||||
MS_LOG(DEBUG) << "Repeat operator, pointer to child node is null. Returning 0.";
|
||||
return 0;
|
||||
} else {
|
||||
return child_[0]->NumProducers();
|
||||
}
|
||||
}
|
||||
|
||||
int64_t RepeatOp::GetTreeRepeatCount() { return num_repeats_; }
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
|
|
|
@ -77,14 +77,6 @@ class RepeatOp : public PipelineOp {
|
|||
// @param worker_id - The worker id
|
||||
Status EofReceived(int32_t worker_id) override;
|
||||
|
||||
// Base-class override. Return the number of workers in the first parent.
|
||||
// @param workerId - The worker id
|
||||
int32_t NumConsumers() const override;
|
||||
|
||||
// Base-class override. Return the number of producers in the first child.
|
||||
// @param workerId - The worker id
|
||||
int32_t NumProducers() const override;
|
||||
|
||||
// Op name getter
|
||||
// @return Name of the current Op
|
||||
std::string Name() const override { return kRepeatOp; }
|
||||
|
|
|
@ -65,27 +65,5 @@ Status SkipOp::GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe)
|
|||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int32_t SkipOp::NumConsumers() const {
|
||||
if (parent_.empty()) {
|
||||
MS_LOG(DEBUG) << "Return operator, no parent node, assuming it's the root and returning 1.";
|
||||
return 1;
|
||||
} else if (parent_[0] == nullptr) {
|
||||
MS_LOG(DEBUG) << "Return operator, pointer to the first parent is null. Returning 0.";
|
||||
return 0;
|
||||
} else {
|
||||
return parent_[0]->NumConsumers();
|
||||
}
|
||||
}
|
||||
|
||||
int32_t SkipOp::NumProducers() const {
|
||||
if (child_.empty() || child_[0] == nullptr) {
|
||||
MS_LOG(DEBUG) << "Return operator, pointer to child node is null. Returning 0.";
|
||||
return 0;
|
||||
} else {
|
||||
return child_[0]->NumProducers();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
|
|
|
@ -49,8 +49,6 @@ class SkipOp : public PipelineOp {
|
|||
// @return Name of the current Op
|
||||
std::string Name() const override { return kSkipOp; }
|
||||
Status GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) override;
|
||||
int32_t NumConsumers() const override;
|
||||
int32_t NumProducers() const override;
|
||||
|
||||
private:
|
||||
int32_t max_skips_; // The number of skips that the user requested
|
||||
|
|
|
@ -68,26 +68,5 @@ Status TakeOp::GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe)
|
|||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int32_t TakeOp::NumConsumers() const {
|
||||
if (parent_.empty()) {
|
||||
MS_LOG(DEBUG) << "Return operator, no parent node, assuming it's the root and returning 1.";
|
||||
return 1;
|
||||
} else if (parent_[0] == nullptr) {
|
||||
MS_LOG(DEBUG) << "Return operator, pointer to the first parent is null. Returning 0.";
|
||||
return 0;
|
||||
} else {
|
||||
return parent_[0]->NumConsumers();
|
||||
}
|
||||
}
|
||||
|
||||
int32_t TakeOp::NumProducers() const {
|
||||
if (child_.empty() || child_[0] == nullptr) {
|
||||
MS_LOG(DEBUG) << "Return operator, pointer to child node is null. Returning 0.";
|
||||
return 0;
|
||||
} else {
|
||||
return child_[0]->NumProducers();
|
||||
}
|
||||
}
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
|
|
|
@ -59,8 +59,6 @@ class TakeOp : public PipelineOp {
|
|||
std::string Name() const override { return kTakeOp; }
|
||||
|
||||
Status GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) override;
|
||||
int32_t NumConsumers() const override;
|
||||
int32_t NumProducers() const override;
|
||||
|
||||
private:
|
||||
int32_t max_takes_; // The number of takes that the user requested
|
||||
|
|
|
@ -131,26 +131,5 @@ Status ZipOp::GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) {
|
|||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int32_t ZipOp::NumConsumers() const {
|
||||
if (parent_.empty()) {
|
||||
MS_LOG(DEBUG) << "Return operator, no parent node, assuming it's the root and returning 1.";
|
||||
return 1;
|
||||
} else if (parent_[0] == nullptr) {
|
||||
MS_LOG(DEBUG) << "Return operator, pointer to the first parent is null. Returning 0.";
|
||||
return 0;
|
||||
} else {
|
||||
return parent_[0]->NumConsumers();
|
||||
}
|
||||
}
|
||||
|
||||
int32_t ZipOp::NumProducers() const {
|
||||
if (child_.empty() || child_[0] == nullptr) {
|
||||
MS_LOG(DEBUG) << "Return operator, pointer to child node is null. Returning 0.";
|
||||
return 0;
|
||||
} else {
|
||||
return child_[0]->NumProducers();
|
||||
}
|
||||
}
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
|
|
|
@ -65,8 +65,6 @@ class ZipOp : public PipelineOp {
|
|||
std::string Name() const override { return kZipOp; }
|
||||
|
||||
Status GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) override;
|
||||
int32_t NumConsumers() const override;
|
||||
int32_t NumProducers() const override;
|
||||
|
||||
private:
|
||||
// Special handle case where an empty row has been received from child iterator
|
||||
|
|
Loading…
Reference in New Issue