forked from mindspore-Ecosystem/mindspore
!2321 Connector throughput performance metric
Merge pull request !2321 from Alexey_Shevlyakov/connector_throughput_dev
This commit is contained in:
commit
9b4399480a
|
@ -102,8 +102,10 @@ class Connector {
|
|||
RETURN_IF_NOT_OK(cv_.Wait(&lk, [this, worker_id]() { return expect_consumer_ == worker_id; }));
|
||||
RETURN_IF_NOT_OK(queues_[pop_from_]->PopFront(result));
|
||||
pop_from_ = (pop_from_ + 1) % num_producers_;
|
||||
out_buffers_count_++;
|
||||
expect_consumer_ = (expect_consumer_ + 1) % num_consumers_;
|
||||
}
|
||||
|
||||
cv_.NotifyAll();
|
||||
return Status::OK();
|
||||
}
|
||||
|
@ -119,6 +121,8 @@ class Connector {
|
|||
return (queues_[worker_id]->Add(el));
|
||||
}
|
||||
|
||||
auto out_buffers_count() const { return out_buffers_count_.load(); }
|
||||
|
||||
// Add an element into the DbConnector without the overhead of synchronization.
|
||||
// It may block when the internal queue is full.
|
||||
// The element passed to this function will be forwarded into the internal queue.
|
||||
|
@ -138,6 +142,7 @@ class Connector {
|
|||
}
|
||||
expect_consumer_ = 0;
|
||||
pop_from_ = 0;
|
||||
out_buffers_count_ = 0;
|
||||
MS_LOG(DEBUG) << "Connector counters reset.";
|
||||
}
|
||||
|
||||
|
@ -198,6 +203,7 @@ class Connector {
|
|||
// Used in the Pop(), when a thread call pop() but it is not the expect_consumer_.
|
||||
std::mutex m_;
|
||||
CondVar cv_;
|
||||
std::atomic<std::int64_t> out_buffers_count_ = 0;
|
||||
};
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
|
|
|
@ -222,6 +222,7 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
|
|||
|
||||
// Getter function
|
||||
// @return connector size of current op
|
||||
|
||||
int32_t ConnectorSize() const {
|
||||
if (!inlined()) {
|
||||
return out_connector_->size();
|
||||
|
@ -230,6 +231,10 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
|
|||
return ChildOpConnectorSize();
|
||||
}
|
||||
|
||||
int64_t ConnectorOutBufferCount() const {
|
||||
return out_connector_ == nullptr ? int64_t(-1) : static_cast<int64_t>(out_connector_->out_buffers_count());
|
||||
}
|
||||
|
||||
// Getter function
|
||||
// @return connector size of current op
|
||||
int32_t ConnectorCapacity() const {
|
||||
|
|
|
@ -83,6 +83,7 @@ class DbConnector : public Connector<std::unique_ptr<DataBuffer>> {
|
|||
expect_consumer_ = (expect_consumer_ + 1) % num_consumers_;
|
||||
}
|
||||
}
|
||||
out_buffers_count_++;
|
||||
cv_.NotifyAll();
|
||||
return Status::OK();
|
||||
}
|
||||
|
|
|
@ -88,8 +88,10 @@ class ExecutionTree {
|
|||
|
||||
bool operator!=(const Iterator &rhs) { return nodes_[ind_] != rhs.nodes_[rhs.ind_]; }
|
||||
|
||||
int32_t NumNodes() { return nodes_.size(); }
|
||||
|
||||
private:
|
||||
int ind_; // the cur node our Iterator points to
|
||||
int32_t ind_; // the cur node our Iterator points to
|
||||
std::vector<std::shared_ptr<DatasetOp>> nodes_; // store the nodes in post order
|
||||
void PostOrderTraverse(const std::shared_ptr<DatasetOp> &);
|
||||
};
|
||||
|
|
|
@ -3,4 +3,6 @@ add_library(engine-perf OBJECT
|
|||
monitor.cc
|
||||
device_queue_tracing.cc
|
||||
connector_size.cc
|
||||
dataset_iterator_tracing.cc)
|
||||
dataset_iterator_tracing.cc
|
||||
connector_throughput.cc
|
||||
)
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
#include "dataset/engine/perf/connector_size.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <fstream>
|
||||
#include <memory>
|
||||
|
|
|
@ -13,8 +13,8 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#ifndef MINDSPORE_QUEUE_DEPTH_H
|
||||
#define MINDSPORE_QUEUE_DEPTH_H
|
||||
#ifndef DATASET_CONNECTOR_SIZE_H
|
||||
#define DATASET_CONNECTOR_SIZE_H
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
@ -50,7 +50,7 @@ class ConnectorSize : public Sampling {
|
|||
// This function samples the connector size of every nodes within the ExecutionTree
|
||||
Status Sample() override;
|
||||
|
||||
std::string Name() const override { return kDeviceQueueTracingName; };
|
||||
std::string Name() const override { return kConnectorSizeSamplingName; }
|
||||
|
||||
// Save sampling data to file
|
||||
// @return Status - The error code return
|
||||
|
@ -65,6 +65,8 @@ class ConnectorSize : public Sampling {
|
|||
ExecutionTree *tree_ = nullptr; // ExecutionTree pointer
|
||||
ConnectorSizeSampleTable sample_table_; // Dataset structure to store all samples of connector size sampling
|
||||
};
|
||||
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
#endif // MINDSPORE_QUEUE_DEPTH_H
|
||||
|
||||
#endif // DATASET_CONNECTOR_SIZE_H
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
/**
|
||||
* Copyright 2020 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <fstream>
|
||||
#include <iterator>
|
||||
#include <algorithm>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <nlohmann/json.hpp>
|
||||
#include "dataset/engine/perf/connector_throughput.h"
|
||||
#include "dataset/engine/execution_tree.h"
|
||||
#include "dataset/util/path.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace dataset {
|
||||
|
||||
// temporary helper
|
||||
int ConnectorThroughput::InitNodes() {
|
||||
auto it = (*tree_).begin();
|
||||
return it.NumNodes();
|
||||
}
|
||||
// Sample action
|
||||
Status ConnectorThroughput::Sample() {
|
||||
std::vector<int64_t> out_buffer_count_row(n_nodes_);
|
||||
std::vector<double> throughput_row(n_nodes_);
|
||||
TimePoint cur_time; // initialised inside the loop, used outside the loop to update prev sample time.
|
||||
auto col = 0;
|
||||
for (const auto &node : *tree_) {
|
||||
auto cur_out_buffer_count = node.ConnectorOutBufferCount();
|
||||
out_buffer_count_row[col] = cur_out_buffer_count;
|
||||
auto sz = timestamps_.size();
|
||||
cur_time = std::chrono::steady_clock::now();
|
||||
auto _dt = std::chrono::duration_cast<std::chrono::microseconds>(timestamps_[0][sz - 1] - timestamps_[0][sz - 2]);
|
||||
auto dt = std::chrono::duration<double>(_dt).count();
|
||||
auto prev_out_buffer_count = out_buffer_count_table_[col][out_buffer_count_table_.size() - 1];
|
||||
if (dt != 0) {
|
||||
auto thr = (cur_out_buffer_count - prev_out_buffer_count) / (1000 * dt);
|
||||
throughput_row[col] = thr;
|
||||
} else {
|
||||
throughput_row[col] = -1;
|
||||
}
|
||||
col++;
|
||||
}
|
||||
std::vector<TimePoint> v = {cur_time}; // temporary fix
|
||||
timestamps_.AddSample(v);
|
||||
// Push new row of sample
|
||||
out_buffer_count_table_.AddSample(out_buffer_count_row);
|
||||
throughput_.AddSample(throughput_row);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
json ConnectorThroughput::ParseOpInfo(const DatasetOp &node, const std::vector<double> &thr) {
|
||||
auto children = node.Children();
|
||||
std::vector<int32_t> children_id;
|
||||
std::transform(children.begin(), children.end(), std::back_inserter(children_id),
|
||||
[](std::shared_ptr<DatasetOp> op) -> int32_t { return op->id(); });
|
||||
json json_node;
|
||||
json_node["op_id"] = node.id();
|
||||
json_node["op_type"] = node.Name();
|
||||
json_node["num_workers"] = node.num_workers();
|
||||
json metrics;
|
||||
metrics["output_queue"] = {{"throughput", thr}};
|
||||
|
||||
json_node["metrics"] = metrics;
|
||||
if (!children_id.empty()) {
|
||||
json_node["children"] = children_id;
|
||||
}
|
||||
|
||||
return json_node;
|
||||
}
|
||||
|
||||
// Save profiling data to file
|
||||
Status ConnectorThroughput::SaveToFile() {
|
||||
std::ofstream os(file_path_);
|
||||
json output;
|
||||
output["sampling_interval"] = 10;
|
||||
// Traverse the ExecutionTree for JSON node generation
|
||||
int col = 0;
|
||||
for (auto &node : *tree_) {
|
||||
std::vector<double> throughput;
|
||||
for (auto i = 0; i < throughput_.size(); i++) {
|
||||
throughput.push_back(throughput_[col][i]);
|
||||
}
|
||||
json json_node = ParseOpInfo(node, throughput);
|
||||
output["op_info"].push_back(json_node);
|
||||
col++;
|
||||
}
|
||||
os << output;
|
||||
return Status::OK();
|
||||
}
|
||||
Status ConnectorThroughput::Init(const std::string &dir_path, const std::string &device_id) {
|
||||
file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + Name() + "_" + device_id + ".json")).toString();
|
||||
return Status::OK();
|
||||
}
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
|
@ -0,0 +1,100 @@
|
|||
/**
|
||||
* Copyright 2020 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef DATASET_CONNECTOR_THROUGHPUT_H
|
||||
#define DATASET_CONNECTOR_THROUGHPUT_H
|
||||
|
||||
#include <vector>
|
||||
#include <chrono>
|
||||
#include <fstream>
|
||||
#include <string>
|
||||
#include <nlohmann/json.hpp>
|
||||
#include "dataset/engine/perf/profiling.h"
|
||||
#include "dataset/engine/perf/perf_data.h"
|
||||
#include "dataset/engine/perf/cyclic_array.h"
|
||||
#include "dataset/engine/datasetops/dataset_op.h"
|
||||
|
||||
using json = nlohmann::json;
|
||||
namespace mindspore {
|
||||
namespace dataset {
|
||||
class ExecutionTree;
|
||||
|
||||
// Connector throughput samples the output connector size of each op in the pipeline.
|
||||
// For the description of the data structure see perf_buffer.h
|
||||
// It support JSON serialization for external usage.
|
||||
class ConnectorThroughput : public Sampling {
|
||||
using OutBufferCount = PerfData<CyclicArray<int64_t>>;
|
||||
using Throughput = PerfData<CyclicArray<double>>;
|
||||
using TimePoint = std::chrono::time_point<std::chrono::steady_clock>;
|
||||
using TimeStamps = PerfData<CyclicArray<TimePoint>>;
|
||||
|
||||
public:
|
||||
explicit ConnectorThroughput(ExecutionTree *tree, int64_t max_rows = 1000000)
|
||||
: tree_(tree),
|
||||
max_rows_(max_rows),
|
||||
n_nodes_(InitNodes()),
|
||||
out_buffer_count_table_(OutBufferCount(max_rows_, n_nodes_)),
|
||||
throughput_(Throughput(max_rows_, n_nodes_)),
|
||||
timestamps_(TimeStamps(max_rows_, 1)) {
|
||||
timestamps_.AddSample(std::vector<TimePoint>(1));
|
||||
out_buffer_count_table_.AddSample(std::vector<int64_t>(n_nodes_));
|
||||
}
|
||||
// Driver function for connector size sampling.
|
||||
// This function samples the connector size of every nodes within the ExecutionTree
|
||||
Status Sample() override;
|
||||
|
||||
/* Status TestPrint() override {
|
||||
std::ofstream os("performance_monitor.txt");
|
||||
if (throughput_.size() == 0) {
|
||||
os << "data is empty" << std::endl;
|
||||
return Status::OK();
|
||||
}
|
||||
for (int i = 0; i < throughput_.size(); i++) {
|
||||
for (int j = 0; j < n_nodes_; j++) {
|
||||
os << throughput_[j][i] << " ";
|
||||
}
|
||||
os << std::endl;
|
||||
}
|
||||
return Status::OK();
|
||||
};*/
|
||||
|
||||
// Traverse the tree nodes and count them
|
||||
int InitNodes();
|
||||
|
||||
std::string Name() const override { return name_; };
|
||||
|
||||
// Save sampling data to file
|
||||
// @return Status - The error code return
|
||||
Status SaveToFile() override;
|
||||
|
||||
Status Init(const std::string &dir_path, const std::string &device_id);
|
||||
|
||||
json ParseOpInfo(const DatasetOp &node, const std::vector<double> &thr);
|
||||
|
||||
private:
|
||||
ExecutionTree *tree_ = nullptr; // ExecutionTree pointer
|
||||
int64_t max_rows_;
|
||||
int32_t n_nodes_;
|
||||
OutBufferCount out_buffer_count_table_;
|
||||
Throughput throughput_;
|
||||
TimeStamps timestamps_;
|
||||
std::string name_ = kConnectorThroughputSamplingName;
|
||||
};
|
||||
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
|
||||
#endif // DATASET_CONNECTOR_THROUGHPUT_H
|
|
@ -0,0 +1,197 @@
|
|||
/**
|
||||
* Copyright 2020 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef DATASET_CYCLIC_ARRAY_H
|
||||
#define DATASET_CYCLIC_ARRAY_H
|
||||
|
||||
#include <memory>
|
||||
#include <algorithm>
|
||||
#include <cstring>
|
||||
#include <type_traits>
|
||||
#include "dataset/core/constants.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace dataset {
|
||||
|
||||
/// \class CyclicArray "include/cyclic_array.h
|
||||
/// \brief This is a container with a contiguous memory layout that pnly keeps N last entries,
|
||||
/// when the number of entries exceeds the capacity
|
||||
/// Must be preallocated
|
||||
template <typename T>
|
||||
class CyclicArray {
|
||||
public:
|
||||
using value_type = T;
|
||||
class Iterator {
|
||||
// Add operator[] and make fully compliant with random access iterator
|
||||
// and add a const iterator
|
||||
// add resize(), empty()
|
||||
public:
|
||||
using iterator_category = std::random_access_iterator_tag;
|
||||
using value_type = CyclicArray::value_type;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using pointer = CyclicArray::value_type *;
|
||||
using reference = CyclicArray::value_type &;
|
||||
|
||||
Iterator() = default;
|
||||
|
||||
Iterator(dsize_t idx, pointer ptr, dsize_t capacity, dsize_t head)
|
||||
: cur_idx_(idx), ptr_(ptr), capacity_(capacity), head_(head) {}
|
||||
|
||||
Iterator(const Iterator &rhs) = default;
|
||||
|
||||
~Iterator() = default;
|
||||
|
||||
Iterator &operator++() {
|
||||
cur_idx_ = (cur_idx_ + 1) % (capacity_ + 1);
|
||||
return *this;
|
||||
}
|
||||
|
||||
Iterator operator++(int) {
|
||||
Iterator tmp(*this);
|
||||
cur_idx_ = (cur_idx_ + 1) % (capacity_ + 1);
|
||||
return tmp;
|
||||
}
|
||||
|
||||
Iterator &operator--() {
|
||||
cur_idx_ = (cur_idx_ + capacity_) % (capacity_ + 1);
|
||||
return *this;
|
||||
}
|
||||
|
||||
Iterator operator--(int) {
|
||||
Iterator tmp(*this);
|
||||
cur_idx_ = (cur_idx_ + capacity_) % (capacity_ + 1);
|
||||
return tmp;
|
||||
}
|
||||
|
||||
Iterator operator+(dsize_t x) { return Iterator((cur_idx_ + x) % (capacity_ + 1), ptr_, capacity_, head_); }
|
||||
|
||||
Iterator operator-(dsize_t x) {
|
||||
return Iterator((cur_idx_ + (capacity_ + 1 - x)) % (capacity_ + 1), ptr_, capacity_, head_);
|
||||
}
|
||||
|
||||
bool operator<(const Iterator &rhs) {
|
||||
return (head_ + cur_idx_) % (capacity_ + 1) < (rhs.head_ + rhs.cur_idx_) % (capacity_ + 1);
|
||||
}
|
||||
|
||||
bool operator>(const Iterator &rhs) {
|
||||
return (head_ + cur_idx_) % (capacity_ + 1) > (rhs.head_ + rhs.cur_idx_) % (capacity_ + 1);
|
||||
}
|
||||
|
||||
bool operator>=(const Iterator &rhs) {
|
||||
return (head_ + cur_idx_) % (capacity_ + 1) >= (rhs.head_ + rhs.cur_idx_) % (capacity_ + 1);
|
||||
}
|
||||
|
||||
bool operator<=(const Iterator &rhs) {
|
||||
return (head_ + cur_idx_) % (capacity_ + 1) <= (rhs.head_ + rhs.cur_idx_) % (capacity_ + 1);
|
||||
}
|
||||
|
||||
difference_type operator-(const Iterator &rhs) {
|
||||
return (cur_idx_ - rhs.cur_idx_ + capacity_ + 1) % (capacity_ + 1);
|
||||
}
|
||||
|
||||
reference operator*() { return ptr_[cur_idx_]; }
|
||||
|
||||
pointer operator->() { return &(ptr_[cur_idx_]); }
|
||||
|
||||
bool operator==(const Iterator &rhs) { return cur_idx_ == rhs.cur_idx_; }
|
||||
|
||||
bool operator!=(const Iterator &rhs) { return cur_idx_ != rhs.cur_idx_; }
|
||||
|
||||
private:
|
||||
dsize_t cur_idx_;
|
||||
pointer ptr_;
|
||||
dsize_t capacity_;
|
||||
dsize_t head_;
|
||||
};
|
||||
|
||||
/// \brief Default constructor
|
||||
CyclicArray() : buf_(nullptr), head_(0), tail_(0), size_(0), capacity_(0) {}
|
||||
|
||||
/// \brief Constructor
|
||||
/// \param[in] capacity
|
||||
explicit CyclicArray(dsize_t capacity)
|
||||
: buf_(std::make_unique<T[]>(capacity + 1)), head_(0), tail_(0), size_(0), capacity_(capacity) {}
|
||||
|
||||
CyclicArray(const CyclicArray<T> &rhs)
|
||||
: buf_(std::make_unique<T[]>(rhs.capacity_ + 1)),
|
||||
head_(rhs.head_),
|
||||
tail_(rhs.tail_),
|
||||
size_(rhs.size_),
|
||||
capacity_(rhs.capacity_) {
|
||||
std::copy(rhs.begin(), rhs.end(), begin());
|
||||
}
|
||||
|
||||
CyclicArray(CyclicArray &&rhs) = default;
|
||||
|
||||
~CyclicArray() = default;
|
||||
|
||||
/// \brief Iterator begin()
|
||||
Iterator begin() { return Iterator(head_, buf_.get(), capacity_, head_); }
|
||||
|
||||
/// \brief Iterator end()
|
||||
Iterator end() { return Iterator(tail_, buf_.get(), capacity_, head_); }
|
||||
|
||||
// not really const.
|
||||
Iterator begin() const { return Iterator(head_, buf_.get(), capacity_, head_); }
|
||||
|
||||
Iterator end() const { return Iterator(tail_, buf_.get(), capacity_, head_); }
|
||||
|
||||
/// \brief clear the array. Does not deallocate memory, capacity remains the same
|
||||
void clear() {
|
||||
head_ = 0;
|
||||
tail_ = 0;
|
||||
size_ = 0;
|
||||
}
|
||||
|
||||
/// \brief returns current size
|
||||
dsize_t size() { return size_; }
|
||||
|
||||
/// \brief returns capacity
|
||||
dsize_t capacity() { return capacity_; }
|
||||
|
||||
/// \brief pushes a value
|
||||
/// \param[in] val value
|
||||
void push_back(T val) {
|
||||
buf_[tail_] = val;
|
||||
if (size_ >= capacity_) {
|
||||
(tail_ != capacity_) ? tail_++ : tail_ = 0;
|
||||
(head_ != capacity_) ? head_++ : head_ = 0;
|
||||
} else {
|
||||
tail_++;
|
||||
size_++;
|
||||
}
|
||||
}
|
||||
|
||||
/// \brief returns const reference to an element of the array
|
||||
/// \param[in] idx index of the element
|
||||
/// \param[out] const T& reference to an element of the array
|
||||
const T &operator[](dsize_t idx) const { return buf_[(head_ + idx) % (capacity_ + 1)]; }
|
||||
|
||||
/// \brief returns non-const reference to an element of the array
|
||||
/// \param[in] idx index of the element
|
||||
/// \param[out] T& reference to an element of the array
|
||||
T &operator[](dsize_t idx) { return buf_[(head_ + idx) % (capacity_ + 1)]; }
|
||||
|
||||
private:
|
||||
std::unique_ptr<T[]> buf_;
|
||||
dsize_t head_;
|
||||
dsize_t tail_;
|
||||
dsize_t size_;
|
||||
dsize_t capacity_;
|
||||
};
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
#endif // DATASET_CYCLIC_ARRAY_H
|
|
@ -13,6 +13,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef MINDSPORE_DATASET_ITERATOR_TRACING_H
|
||||
#define MINDSPORE_DATASET_ITERATOR_TRACING_H
|
||||
|
||||
|
|
|
@ -28,7 +28,6 @@ Monitor::Monitor(ExecutionTree *tree) : tree_(tree) {
|
|||
max_samples_ = 0;
|
||||
cur_row_ = 0;
|
||||
}
|
||||
|
||||
Status Monitor::operator()() {
|
||||
// Register this thread with TaskManager to receive proper interrupt signal.
|
||||
TaskManager::FindMe()->Post();
|
||||
|
|
|
@ -29,6 +29,7 @@ class ExecutionTree;
|
|||
class Monitor {
|
||||
public:
|
||||
// Monitor object constructor
|
||||
|
||||
explicit Monitor(ExecutionTree *tree);
|
||||
|
||||
Monitor() = default;
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
/**
|
||||
* Copyright 2020 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef DATASET_PERF_DATA_H
|
||||
#define DATASET_PERF_DATA_H
|
||||
|
||||
#include <vector>
|
||||
#include "dataset/core/constants.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace dataset {
|
||||
|
||||
// PerfData is a convenience class to record and store the data produced by Monitor
|
||||
// and represents a 2D column major table with every column storing samples
|
||||
// for an operator. The number of rows equals to the number of samples,
|
||||
// the number of columns equals to the number of operators.
|
||||
// The capacity is determined on construction and cannot be changed.
|
||||
// ColumnType can be std::vector or CyclicArray. In case of the latter data can be added
|
||||
// indefinitely without the risk of overflowing otherwise the capacity must not be exceeded.
|
||||
// Given PerfData pd(n_rows, n_cols) an element in the column i and row j can be accessed as
|
||||
// pd[i][j]
|
||||
|
||||
template <typename ColumnType>
|
||||
class PerfData {
|
||||
public:
|
||||
PerfData() = default;
|
||||
~PerfData() = default;
|
||||
PerfData(dsize_t max_rows, dsize_t n_cols) : counter_(0), max_rows_(max_rows), n_cols_(n_cols) {
|
||||
for (auto i = 0; i < n_cols_; i++) {
|
||||
data_.push_back(ColumnType(max_rows_));
|
||||
}
|
||||
}
|
||||
PerfData(const PerfData &rhs) = default;
|
||||
PerfData(PerfData &&rhs) = default;
|
||||
|
||||
// Adds a row of data
|
||||
// T must be any container working with range based loops
|
||||
template <typename T>
|
||||
void AddSample(const T &row) {
|
||||
auto i = 0;
|
||||
for (const auto &e : row) {
|
||||
data_[i++].push_back(e);
|
||||
}
|
||||
counter_++;
|
||||
}
|
||||
|
||||
// Fetches a row of data by copy
|
||||
template <typename V = typename ColumnType::value_type>
|
||||
auto Row(dsize_t idx) {
|
||||
std::vector<V> row(n_cols_);
|
||||
for (auto i = 0; i < n_cols_; i++) {
|
||||
row[i] = data_[i][idx];
|
||||
}
|
||||
return row;
|
||||
}
|
||||
|
||||
// returns a column of data
|
||||
ColumnType &operator[](size_t idx) { return data_[idx]; }
|
||||
|
||||
const ColumnType &operator[](size_t idx) const { return data_[idx]; }
|
||||
|
||||
dsize_t size() { return counter_ < max_rows_ ? counter_ : max_rows_; }
|
||||
|
||||
dsize_t capacity() { return max_rows_; }
|
||||
|
||||
private:
|
||||
std::vector<ColumnType> data_;
|
||||
dsize_t counter_;
|
||||
dsize_t max_rows_;
|
||||
int n_cols_;
|
||||
};
|
||||
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
#endif // DATASET_PERF_DATA_H
|
|
@ -14,7 +14,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
#include "dataset/engine/perf/profiling.h"
|
||||
|
||||
#include <sys/time.h>
|
||||
#include <cstdlib>
|
||||
#include <fstream>
|
||||
|
@ -23,6 +22,7 @@
|
|||
#include "dataset/engine/perf/monitor.h"
|
||||
#include "dataset/engine/perf/device_queue_tracing.h"
|
||||
#include "dataset/engine/perf/connector_size.h"
|
||||
#include "dataset/engine/perf/connector_throughput.h"
|
||||
#include "dataset/engine/perf/dataset_iterator_tracing.h"
|
||||
#include "utils/log_adapter.h"
|
||||
|
||||
|
@ -72,9 +72,11 @@ Status ProfilingManager::Initialize() {
|
|||
std::shared_ptr<Tracing> dataset_iterator_tracing = std::make_shared<DatasetIteratorTracing>();
|
||||
RETURN_IF_NOT_OK(RegisterTracingNode(dataset_iterator_tracing));
|
||||
|
||||
std::shared_ptr<Sampling> monitor_sampling = std::make_shared<ConnectorSize>(tree_);
|
||||
RETURN_IF_NOT_OK(RegisterSamplingNode(monitor_sampling));
|
||||
std::shared_ptr<Sampling> connector_size_sampling = std::make_shared<ConnectorSize>(tree_);
|
||||
RETURN_IF_NOT_OK(RegisterSamplingNode(connector_size_sampling));
|
||||
|
||||
std::shared_ptr<Sampling> connector_thr_sampling = std::make_shared<ConnectorThroughput>(tree_);
|
||||
RETURN_IF_NOT_OK(RegisterSamplingNode(connector_thr_sampling));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
@ -140,14 +142,15 @@ Status ProfilingManager::SaveProfilingData() {
|
|||
RETURN_IF_NOT_OK(node.second->SaveToFile());
|
||||
}
|
||||
MS_LOG(INFO) << "Save profiling data end.";
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
double ProfilingTime::GetCurMilliSecond() {
|
||||
struct timeval tv = {0, 0};
|
||||
(void)gettimeofday(&tv, nullptr);
|
||||
return tv.tv_sec * 1000 + tv.tv_usec / 1000;
|
||||
int64_t ProfilingTime::GetCurMilliSecond() {
|
||||
// because cpplint does not allow using namespace
|
||||
using std::chrono::duration_cast;
|
||||
using std::chrono::milliseconds;
|
||||
using std::chrono::steady_clock;
|
||||
return duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count();
|
||||
}
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include <vector>
|
||||
#include <unordered_map>
|
||||
#include <memory>
|
||||
#include <chrono>
|
||||
#include "dataset/util/status.h"
|
||||
|
||||
namespace mindspore {
|
||||
|
@ -28,9 +29,10 @@ namespace dataset {
|
|||
class Monitor;
|
||||
class ExecutionTree;
|
||||
|
||||
const char kDeviceQueueTracingName[] = "Device Queue Tracing";
|
||||
const char kDatasetIteratorTracingName[] = "Dataset Iterator Tracing";
|
||||
const char kConnectorSizeSamplingName[] = "Connector Size Sampling";
|
||||
const char kDeviceQueueTracingName[] = "Device_Queue_Tracing";
|
||||
const char kDatasetIteratorTracingName[] = "Dataset_Iterator_Tracing";
|
||||
const char kConnectorSizeSamplingName[] = "Connector_Size_Sampling";
|
||||
const char kConnectorThroughputSamplingName[] = "Connector_Throughput_Sampling";
|
||||
|
||||
// Profiling is a class of basic unit of profiling action
|
||||
// This base class encapsulate the serialization output logic
|
||||
|
@ -59,6 +61,8 @@ class Sampling : public Profiling {
|
|||
public:
|
||||
// Sampling action function. This function will be invoked by performance monitor thread.
|
||||
virtual Status Sample() = 0;
|
||||
// virtual Status TestPrint() = 0;
|
||||
virtual ~Sampling() = default;
|
||||
};
|
||||
|
||||
// Tracing is class of profiling which record samples upon request.
|
||||
|
@ -132,7 +136,7 @@ enum ProfilingTimeSubType {
|
|||
|
||||
class ProfilingTime {
|
||||
public:
|
||||
static double GetCurMilliSecond();
|
||||
static int64_t GetCurMilliSecond();
|
||||
};
|
||||
|
||||
} // namespace dataset
|
||||
|
|
|
@ -79,6 +79,8 @@ SET(DE_UT_SRCS
|
|||
mask_test.cc
|
||||
trucate_pair_test.cc
|
||||
concatenate_op_test.cc
|
||||
cyclic_array_test.cc
|
||||
perf_data_test.cc
|
||||
)
|
||||
|
||||
add_executable(de_ut_tests ${DE_UT_SRCS})
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
/**
|
||||
* Copyright 2020 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#include <iterator>
|
||||
#include <algorithm>
|
||||
#include "common/common.h"
|
||||
#include "common/cvop_common.h"
|
||||
#include "gtest/gtest.h"
|
||||
#include "securec.h"
|
||||
#include "dataset/util/de_error.h"
|
||||
#include "dataset/engine/perf/cyclic_array.h"
|
||||
#include <chrono>
|
||||
|
||||
using namespace mindspore::dataset;
|
||||
|
||||
class MindDataTestCyclicArray : public UT::Common {
|
||||
public:
|
||||
MindDataTestCyclicArray() {}
|
||||
};
|
||||
|
||||
TEST_F(MindDataTestCyclicArray, Test1) {
|
||||
CyclicArray<int> arr(5);
|
||||
EXPECT_EQ(5, arr.capacity());
|
||||
EXPECT_EQ(0, arr.size());
|
||||
arr.push_back(0);
|
||||
EXPECT_EQ(5, arr.capacity());
|
||||
EXPECT_EQ(1, arr.size());
|
||||
EXPECT_EQ(arr[0], 0);
|
||||
arr.push_back(1);
|
||||
EXPECT_EQ(arr[1], 1);
|
||||
for (auto i = 2; i < 5; i++) {
|
||||
arr.push_back(i);
|
||||
}
|
||||
EXPECT_EQ(arr.capacity(), arr.size());
|
||||
EXPECT_EQ(1, arr[1]);
|
||||
EXPECT_EQ(4, arr[4]);
|
||||
arr[4] = 42;
|
||||
EXPECT_EQ(arr[4], 42);
|
||||
auto a = arr[4];
|
||||
EXPECT_EQ(a, 42);
|
||||
arr.push_back(5);
|
||||
EXPECT_EQ(arr[0], 1);
|
||||
EXPECT_EQ(arr[4], 5);
|
||||
|
||||
CyclicArray<int> arr2 = arr;
|
||||
EXPECT_EQ(arr2.capacity(), arr.capacity());
|
||||
EXPECT_EQ(arr2.size(), arr.size());
|
||||
auto last = arr2.end();
|
||||
auto first = arr2.begin();
|
||||
for (auto i = 0; i < arr.size(); i++) {
|
||||
EXPECT_EQ(arr2[i], arr[i]);
|
||||
}
|
||||
|
||||
arr.clear();
|
||||
EXPECT_EQ(arr.size(), 0);
|
||||
arr.push_back(42);
|
||||
arr.push_back(43);
|
||||
EXPECT_EQ(arr.size(), 2);
|
||||
EXPECT_EQ(arr.capacity(), 5);
|
||||
EXPECT_EQ(arr[0], 42);
|
||||
EXPECT_EQ(arr[1], 43);
|
||||
auto arr3 = arr;
|
||||
EXPECT_EQ(arr3.size(), 2);
|
||||
EXPECT_EQ(arr3.capacity(), 5);
|
||||
EXPECT_EQ(arr.size(), 2);
|
||||
EXPECT_EQ(arr.capacity(), 5);
|
||||
EXPECT_EQ(arr[0], arr3[0]);
|
||||
EXPECT_EQ(arr[1], arr3[1]);
|
||||
|
||||
arr.clear();
|
||||
arr.push_back(21);
|
||||
arr.push_back(22);
|
||||
EXPECT_EQ(arr[arr.size() - 1], 22);
|
||||
for (auto i = 23; i < 27; i++) {
|
||||
arr.push_back(i);
|
||||
}
|
||||
EXPECT_EQ(arr[0], 22);
|
||||
EXPECT_EQ(arr[arr.size() - 1], 26);
|
||||
}
|
||||
|
||||
TEST_F(MindDataTestCyclicArray, TestIterator) {
|
||||
CyclicArray<int> arr(5);
|
||||
for (auto i = 0; i < arr.capacity(); i++) {
|
||||
arr.push_back(i);
|
||||
}
|
||||
arr.push_back(6);
|
||||
arr.push_back(7);
|
||||
auto i = 0;
|
||||
for (auto it = arr.begin(); it != arr.end(); ++it) {
|
||||
EXPECT_EQ(*it, arr[i++]);
|
||||
}
|
||||
|
||||
std::iota(arr.begin(), arr.end(), -4);
|
||||
EXPECT_EQ(arr[0], -4);
|
||||
EXPECT_EQ(arr[4], 0);
|
||||
const auto sz = 1000000;
|
||||
CyclicArray<int> arr2(sz);
|
||||
for (auto i = 0; i < sz - 1; i++) {
|
||||
arr.push_back(0);
|
||||
}
|
||||
const auto val = -500000;
|
||||
std::iota(arr2.begin(), arr2.end() + sz, val);
|
||||
EXPECT_EQ(*arr2.begin(), val);
|
||||
std::random_device rd;
|
||||
std::mt19937 g(rd());
|
||||
std::shuffle(arr2.begin(), arr2.end(), g);
|
||||
std::sort(arr2.begin(), arr2.end(), [](const auto a, const auto b) { return a > b; });
|
||||
EXPECT_EQ(*arr2.begin(), val);
|
||||
const auto new_val = -600000;
|
||||
for (auto i = 0; i < 100; i++) {
|
||||
arr2.push_back(new_val);
|
||||
}
|
||||
EXPECT_EQ(*(--arr2.end()), new_val);
|
||||
std::sort(arr2.begin(), arr2.end(), [](const auto a, const auto b) { return a > b; });
|
||||
EXPECT_EQ(*arr2.begin(), new_val);
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/**
|
||||
* Copyright 2020 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#include "common/common.h"
|
||||
#include "common/cvop_common.h"
|
||||
#include "gtest/gtest.h"
|
||||
#include "securec.h"
|
||||
#include "dataset/util/de_error.h"
|
||||
#include "dataset/engine/perf/cyclic_array.h"
|
||||
#include "dataset/engine/perf/perf_data.h"
|
||||
|
||||
using namespace mindspore::dataset;
|
||||
|
||||
class MindDataTestPerfData : public UT::Common {
|
||||
public:
|
||||
MindDataTestPerfData() {}
|
||||
};
|
||||
|
||||
TEST_F(MindDataTestPerfData, Test1) {
|
||||
PerfData<std::vector<int>> p1(2, 3);
|
||||
PerfData<CyclicArray<int>> p2(2, 3);
|
||||
EXPECT_EQ(p1.capacity(), p2.capacity());
|
||||
std::vector<int> row = {1, 2, 3};
|
||||
p1.AddSample(row);
|
||||
p2.AddSample(row);
|
||||
EXPECT_EQ(p1.size(), p2.size());
|
||||
p1.AddSample(row);
|
||||
p2.AddSample(row);
|
||||
EXPECT_EQ(p1.size(), p2.size());
|
||||
row = {4, 5, 6};
|
||||
p2.AddSample(row);
|
||||
auto r1 = p2.Row<int>(static_cast<int64_t>(0));
|
||||
for (auto i = 0; i < 3; i++) {
|
||||
EXPECT_EQ(r1[i], i + 1);
|
||||
}
|
||||
|
||||
auto r2 = p2.Row<int>(1);
|
||||
for (auto i = 0; i < 3; i++) {
|
||||
EXPECT_EQ(r2[i], i + 4);
|
||||
}
|
||||
|
||||
EXPECT_EQ(p2[0][1], 4);
|
||||
EXPECT_EQ(p2[1][1], 5);
|
||||
EXPECT_EQ(p2[2][1], 6);
|
||||
}
|
||||
|
||||
TEST_F(MindDataTestPerfData, Test2) {
|
||||
auto pd = PerfData<CyclicArray<int>>(1000000, 3);
|
||||
auto row = {1, 2, 3};
|
||||
pd.AddSample(row);
|
||||
EXPECT_EQ(pd[0][0], 1);
|
||||
EXPECT_EQ(pd[1][0], 2);
|
||||
EXPECT_EQ(pd[2][0], 3);
|
||||
row = {4, 5, 6};
|
||||
pd.AddSample(row);
|
||||
EXPECT_EQ(pd[0][0], 1);
|
||||
EXPECT_EQ(pd[1][0], 2);
|
||||
EXPECT_EQ(pd[2][0], 3);
|
||||
}
|
|
@ -23,7 +23,8 @@ FILES = ["../data/dataset/testTFTestAllTypes/test.data"]
|
|||
DATASET_ROOT = "../data/dataset/testTFTestAllTypes/"
|
||||
SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json"
|
||||
|
||||
PIPELINE_FILE = "./pipeline_profiling_1.json"
|
||||
PIPELINE_FILE_SIZE = "./pipeline_profiling_1.json"
|
||||
PIPELINE_FILE_THR = "./pipeline_profiling_Connector_Throughput_Sampling_1.json"
|
||||
DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_1.txt"
|
||||
|
||||
|
||||
|
@ -43,8 +44,10 @@ def test_profiling_simple_pipeline():
|
|||
for _ in data1:
|
||||
pass
|
||||
|
||||
assert os.path.exists(PIPELINE_FILE) is True
|
||||
os.remove(PIPELINE_FILE)
|
||||
assert os.path.exists(PIPELINE_FILE_SIZE) is True
|
||||
os.remove(PIPELINE_FILE_SIZE)
|
||||
assert os.path.exists(PIPELINE_FILE_THR) is True
|
||||
os.remove(PIPELINE_FILE_THR)
|
||||
assert os.path.exists(DATASET_ITERATOR_FILE) is True
|
||||
os.remove(DATASET_ITERATOR_FILE)
|
||||
del os.environ['PROFILING_MODE']
|
||||
|
@ -74,8 +77,10 @@ def test_profiling_complex_pipeline():
|
|||
for _ in data3:
|
||||
pass
|
||||
|
||||
assert os.path.exists(PIPELINE_FILE) is True
|
||||
os.remove(PIPELINE_FILE)
|
||||
assert os.path.exists(PIPELINE_FILE_SIZE) is True
|
||||
os.remove(PIPELINE_FILE_SIZE)
|
||||
assert os.path.exists(PIPELINE_FILE_THR) is True
|
||||
os.remove(PIPELINE_FILE_THR)
|
||||
assert os.path.exists(DATASET_ITERATOR_FILE) is True
|
||||
os.remove(DATASET_ITERATOR_FILE)
|
||||
del os.environ['PROFILING_MODE']
|
||||
|
@ -103,8 +108,10 @@ def test_profiling_sampling_iterval():
|
|||
for _ in data1:
|
||||
pass
|
||||
|
||||
assert os.path.exists(PIPELINE_FILE) is True
|
||||
os.remove(PIPELINE_FILE)
|
||||
assert os.path.exists(PIPELINE_FILE_SIZE) is True
|
||||
os.remove(PIPELINE_FILE_SIZE)
|
||||
assert os.path.exists(PIPELINE_FILE_THR) is True
|
||||
os.remove(PIPELINE_FILE_THR)
|
||||
assert os.path.exists(DATASET_ITERATOR_FILE) is True
|
||||
os.remove(DATASET_ITERATOR_FILE)
|
||||
|
||||
|
|
Loading…
Reference in New Issue