!11582 dataset: add cpu utilization profiling

From: @ms_yan
Reviewed-by: 
Signed-off-by:
This commit is contained in:
mindspore-ci-bot 2021-02-01 16:43:44 +08:00 committed by Gitee
commit 7ec4aa92c7
43 changed files with 1076 additions and 161 deletions

View File

@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "minddata/dataset/api/python/pybind_conversion.h"
namespace mindspore {
@ -63,6 +62,25 @@ std::vector<std::string> toStringVector(const py::list list) {
return vector;
}
std::vector<pid_t> toIntVector(const py::list input_list) {
std::vector<pid_t> vector;
if (!input_list.empty()) {
std::transform(input_list.begin(), input_list.end(), std::back_inserter(vector),
[&](const py::handle &handle) { return static_cast<pid_t>(toInt(handle)); });
}
return vector;
}
std::unordered_map<int32_t, std::vector<pid_t>> toIntMap(const py::dict input_dict) {
std::unordered_map<int32_t, std::vector<pid_t>> map;
if (!input_dict.empty()) {
for (auto p : input_dict) {
(void)map.emplace(toInt(p.first), toIntVector(py::reinterpret_borrow<py::list>(p.second)));
}
}
return map;
}
std::pair<int64_t, int64_t> toIntPair(const py::tuple tuple) {
std::pair<int64_t, int64_t> pair;
if (!tuple.empty()) {

View File

@ -17,11 +17,13 @@
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_API_PYTHON_PYBIND_CONVERSION_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_API_PYTHON_PYBIND_CONVERSION_H_
#include <algorithm>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <unordered_map>
#include <vector>
#include "pybind11/pybind11.h"
#include "pybind11/stl.h"
@ -53,6 +55,10 @@ std::map<std::string, int32_t> toStringMap(const py::dict dict);
std::vector<std::string> toStringVector(const py::list list);
std::vector<pid_t> toIntVector(const py::list input_list);
std::unordered_map<int32_t, std::vector<pid_t>> toIntMap(const py::dict input_dict);
std::pair<int64_t, int64_t> toIntPair(const py::tuple tuple);
std::vector<std::pair<int, int>> toPairVector(const py::list list);

View File

@ -23,6 +23,7 @@
#include "minddata/dataset/engine/execution_tree.h"
#include "minddata/dataset/util/status.h"
#include "minddata/dataset/engine/datasetops/dataset_op.h"
#include "minddata/dataset/engine/perf/profiling.h"
namespace mindspore {
namespace dataset {
@ -185,7 +186,8 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {
RETURN_IF_NOT_OK(curr_buffer_->PopRow(out_row));
if (tracing_ != nullptr) {
cur_batch_num_++;
tracing_->Record(CONNECTOR_DEPTH, cur_connector_capacity_, cur_batch_num_, cur_connector_size_);
tracing_->Record(CONNECTOR_DEPTH, cur_connector_capacity_, cur_batch_num_, cur_connector_size_,
ProfilingTime::GetCurMilliSecond());
}
return Status::OK();
}

View File

@ -265,7 +265,7 @@ Status BatchOp::LaunchThreadsAndInitOp() {
}
RETURN_IF_NOT_OK(worker_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&BatchOp::WorkerEntry, this, std::placeholders::_1), Name()));
tree_->LaunchWorkers(num_workers_, std::bind(&BatchOp::WorkerEntry, this, std::placeholders::_1), Name(), id()));
return Status::OK();
}

View File

@ -45,8 +45,8 @@ Status BuildSentencePieceVocabOp::operator()() {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Pipeline init failed, Execution tree not set.");
}
RETURN_IF_NOT_OK(sentence_queue_->Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(
tree_->AllTasks()->CreateAsyncTask("sentenceTask", std::bind(&BuildSentencePieceVocabOp::SentenceThread, this)));
RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(
"sentenceTask", std::bind(&BuildSentencePieceVocabOp::SentenceThread, this), nullptr, id()));
TaskManager::FindMe()->Post();
child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
TensorRow new_row;

View File

@ -86,8 +86,9 @@ Status BuildVocabOp::operator()() {
RETURN_IF_NOT_OK(collector_queue_->Register(tree_->AllTasks()));
// launch worker threads and collector thread
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&BuildVocabOp::WorkerEntry, this, std::placeholders::_1)));
RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("collector", std::bind(&BuildVocabOp::CollectorThread, this)));
tree_->LaunchWorkers(num_workers_, std::bind(&BuildVocabOp::WorkerEntry, this, std::placeholders::_1), "", id()));
RETURN_IF_NOT_OK(
tree_->AllTasks()->CreateAsyncTask("collector", std::bind(&BuildVocabOp::CollectorThread, this), nullptr, id()));
TaskManager::FindMe()->Post();
child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
TensorRow new_row;

View File

@ -65,7 +65,7 @@ Status CacheLookupOp::operator()() {
RETURN_IF_NOT_OK(RegisterResources());
// Kick off the workers
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&CacheLookupOp::WorkerEntry, this, std::placeholders::_1)));
tree_->LaunchWorkers(num_workers_, std::bind(&CacheLookupOp::WorkerEntry, this, std::placeholders::_1), "", id()));
// required task group sync after launching workers
TaskManager::FindMe()->Post();
// We have to wait until the leaf op has handshake with us.

View File

@ -60,13 +60,14 @@ Status CacheMergeOp::operator()() {
io_que_ = std::make_unique<Queue<row_id_type>>(queue_sz);
RETURN_IF_NOT_OK(io_que_->Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(
num_workers_, std::bind(&CacheMergeOp::WorkerEntry, this, std::placeholders::_1), Name() + "::WorkerEntry"));
num_workers_, std::bind(&CacheMergeOp::WorkerEntry, this, std::placeholders::_1), Name() + "::WorkerEntry", id()));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_,
std::bind(&CacheMergeOp::CacheMissWorkerEntry, this, std::placeholders::_1),
Name() + "::CacheMissWorkerEntry"));
Name() + "::CacheMissWorkerEntry", id()));
// One dedicated thread to move TensorRow from the pool to the cache server
for (auto i = 0; i < num_cleaners_; ++i) {
RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Cleaner", std::bind(&CacheMergeOp::Cleaner, this)));
RETURN_IF_NOT_OK(
tree_->AllTasks()->CreateAsyncTask("Cleaner", std::bind(&CacheMergeOp::Cleaner, this), nullptr, id()));
}
TaskManager::FindMe()->Post();
return Status::OK();

View File

@ -26,7 +26,6 @@
#include "minddata/dataset/engine/dataset_iterator.h"
#include "minddata/dataset/engine/datasetops/epoch_ctrl_op.h"
#include "minddata/dataset/engine/opt/pass.h"
#include "minddata/dataset/engine/perf/device_queue_tracing.h"
#include "minddata/dataset/engine/perf/profiling.h"
#include "minddata/dataset/util/status.h"
#include "minddata/dataset/util/task_manager.h"
@ -134,8 +133,8 @@ Status DeviceQueueOp::operator()() {
Status DeviceQueueOp::SendDataToAscend() {
MS_LOG(INFO) << "Device queue, sending data to Ascend.";
int64_t send_batch = 0;
double batch_start_time, end_time;
int32_t batch_cost, tdt_cost;
uint64_t batch_start_time, end_time;
int32_t tdt_cost;
int32_t connector_size = 0;
int32_t connector_capacity;
bool is_break_loop = false;
@ -178,20 +177,8 @@ Status DeviceQueueOp::SendDataToAscend() {
[](const std::shared_ptr<Tensor> &ts) { return std::make_pair(ts->type(), ts->shape()); });
RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info));
}
if (isProfilingEnable) {
end_time = ProfilingTime::GetCurMilliSecond();
// record push tdt time
profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch + 1, tdt_cost);
batch_cost = (int32_t)(end_time - batch_start_time);
// record batch time
profiling_node->Record(TIME, BATCH_TIME, send_batch + 1, batch_cost);
// record pipeline time
profiling_node->Record(TIME, PIPELINE_TIME, send_batch + 1, batch_cost - tdt_cost);
batch_start_time = end_time;
// record connector depth
profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch + 1, connector_size);
}
ProfilingRecorder(isProfilingEnable, profiling_node, send_batch, tdt_cost, &batch_start_time, &end_time,
connector_capacity, connector_size);
send_batch++;
if (total_batch_ > 0 && send_batch >= total_batch_) {
@ -273,9 +260,9 @@ Status DeviceQueueOp::LaunchParallelCopyThread() {
receive_queues_.Init(num_workers_, queue_capacity_);
RETURN_IF_NOT_OK(receive_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&DeviceQueueOp::WorkerEntry, this, std::placeholders::_1)));
RETURN_IF_NOT_OK(
tree_->AllTasks()->CreateAsyncTask("Push data to GPU queue", std::bind(&DeviceQueueOp::PushDataToGPU, this)));
tree_->LaunchWorkers(num_workers_, std::bind(&DeviceQueueOp::WorkerEntry, this, std::placeholders::_1), "", id()));
RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Push data to GPU queue",
std::bind(&DeviceQueueOp::PushDataToGPU, this), nullptr, id()));
return Status::OK();
}
@ -285,8 +272,8 @@ Status DeviceQueueOp::PushDataToGPU() {
// and will overload in distribute scenario, so don't remove this line
cudaSetDevice(rank_id_);
TaskManager::FindMe()->Post();
double batch_start_time = 0.0;
double end_time = 0.0;
uint64_t batch_start_time = 0;
uint64_t end_time = 0;
int32_t batch_cost = 0;
int32_t push_cost = 0;
int32_t connector_size = 0;
@ -345,15 +332,15 @@ Status DeviceQueueOp::PushDataToGPU() {
if (isProfilingEnable) {
end_time = ProfilingTime::GetCurMilliSecond();
// record push data time
profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch, push_cost);
profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch, push_cost, end_time);
batch_cost = (int32_t)(end_time - batch_start_time);
// record batch time
profiling_node->Record(TIME, BATCH_TIME, send_batch, batch_cost);
profiling_node->Record(TIME, BATCH_TIME, send_batch, batch_cost, end_time);
// record pipeline time
profiling_node->Record(TIME, PIPELINE_TIME, send_batch, batch_cost - push_cost);
profiling_node->Record(TIME, PIPELINE_TIME, send_batch, batch_cost - push_cost, end_time);
batch_start_time = end_time;
// record connector depth
profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch, connector_size);
profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch, connector_size, end_time);
connector_size = gpu_item_connector_->size();
connector_capacity = gpu_item_connector_->capacity();
}
@ -508,5 +495,23 @@ Status DeviceQueueOp::Accept(NodePass *p, bool *const modified) {
return p->RunOnNode(shared_from_base<DeviceQueueOp>(), modified);
}
void DeviceQueueOp::ProfilingRecorder(bool isProfilingEnable, std::shared_ptr<DeviceQueueTracing> profiling_node,
int64_t send_batch, int32_t tdt_cost, uint64_t *batch_start_time,
uint64_t *end_time, int32_t connector_capacity, int32_t connector_size) {
// Record the pipeline profiling info
if (isProfilingEnable) {
*end_time = ProfilingTime::GetCurMilliSecond();
// record push tdt time
profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch + 1, tdt_cost, *end_time);
int32_t batch_cost = (int32_t)(*end_time - *batch_start_time);
// record batch time
profiling_node->Record(TIME, BATCH_TIME, send_batch + 1, batch_cost, *end_time);
// record pipeline time
profiling_node->Record(TIME, PIPELINE_TIME, send_batch + 1, batch_cost - tdt_cost, *end_time);
*batch_start_time = *end_time;
// record connector depth
profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch + 1, connector_size, *end_time);
}
}
} // namespace dataset
} // namespace mindspore

View File

@ -23,6 +23,7 @@
#include "minddata/dataset/engine/datasetops/pipeline_op.h"
#include "minddata/dataset/engine/datasetops/repeat_op.h"
#include "minddata/dataset/engine/perf/device_queue_tracing.h"
#include "minddata/dataset/util/status.h"
#ifdef ENABLE_TDTQUE
@ -173,6 +174,11 @@ class DeviceQueueOp : public PipelineOp {
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *const modified) override;
// Record the pipeline profiling info
void ProfilingRecorder(bool isProfilingEnable, std::shared_ptr<DeviceQueueTracing> profiling_node, int64_t send_batch,
int32_t tdt_cost, uint64_t *batch_start_time, uint64_t *end_time, int32_t connector_capacity,
int32_t connector_size);
// Op name getter
// @return Name of the current Op
std::string Name() const override { return kDeviceQueueOp; }

View File

@ -71,7 +71,7 @@ Status FilterOp::operator()() {
filter_queues_.Init(num_workers_, oc_queue_size_);
RETURN_IF_NOT_OK(filter_queues_.Register(tree_->AllTasks()));
Status rc =
tree_->LaunchWorkers(num_workers_, std::bind(&FilterOp::WorkerEntry, this, std::placeholders::_1), Name());
tree_->LaunchWorkers(num_workers_, std::bind(&FilterOp::WorkerEntry, this, std::placeholders::_1), Name(), id());
// Synchronize with TaskManager.
TaskManager::FindMe()->Post();
RETURN_IF_NOT_OK(rc);

View File

@ -464,65 +464,7 @@ Status AlbumOp::LoadTensorRow(row_id_type row_id, const std::string &file, Tenso
// loop over each column descriptor, this can optimized by switch cases
for (int32_t i = 0; i < columns; i++) {
// special case to handle
if (data_schema_->column(i).name() == "id") {
// id is internal, special case to load from file
RETURN_IF_NOT_OK(LoadIDTensor(file, i, row));
continue;
}
// find if key does not exist, insert placeholder nullptr if not found
if (js.find(data_schema_->column(i).name()) == js.end()) {
// iterator not found, push nullptr as placeholder
MS_LOG(INFO) << "Pushing empty tensor for column: " << data_schema_->column(i).name() << ".";
RETURN_IF_NOT_OK(LoadEmptyTensor(i, row));
continue;
}
nlohmann::json column_value = js.at(data_schema_->column(i).name());
MS_LOG(INFO) << "This column is: " << data_schema_->column(i).name() << ".";
bool is_array = column_value.is_array();
// load single string
if (column_value.is_string() && data_schema_->column(i).type() == DataType::DE_STRING) {
RETURN_IF_NOT_OK(LoadStringTensor(column_value, i, row));
continue;
}
// load string array
if (is_array && data_schema_->column(i).type() == DataType::DE_STRING) {
RETURN_IF_NOT_OK(LoadStringArrayTensor(column_value, i, row));
continue;
}
// load image file
if (column_value.is_string() && data_schema_->column(i).type() != DataType::DE_STRING) {
std::string image_file_path = column_value;
RETURN_IF_NOT_OK(LoadImageTensor(image_file_path, i, row));
continue;
}
// load float value
if (!is_array && (data_schema_->column(i).type() == DataType::DE_FLOAT32 ||
data_schema_->column(i).type() == DataType::DE_FLOAT64)) {
RETURN_IF_NOT_OK(LoadFloatTensor(column_value, i, row));
continue;
}
// load float array
if (is_array && (data_schema_->column(i).type() == DataType::DE_FLOAT32 ||
data_schema_->column(i).type() == DataType::DE_FLOAT64)) {
RETURN_IF_NOT_OK(LoadFloatArrayTensor(column_value, i, row));
continue;
}
// int value
if (!is_array && (data_schema_->column(i).type() == DataType::DE_INT64 ||
data_schema_->column(i).type() == DataType::DE_INT32)) {
RETURN_IF_NOT_OK(LoadIntTensor(column_value, i, row));
continue;
}
// int array
if (is_array && (data_schema_->column(i).type() == DataType::DE_INT64 ||
data_schema_->column(i).type() == DataType::DE_INT32)) {
RETURN_IF_NOT_OK(LoadIntArrayTensor(column_value, i, row));
continue;
} else {
MS_LOG(WARNING) << "Value type for column: " << data_schema_->column(i).name() << " is not supported.";
continue;
}
RETURN_IF_NOT_OK(loadColumnData(file, i, js, row));
}
} catch (const std::exception &err) {
file_handle.close();
@ -535,6 +477,60 @@ Status AlbumOp::LoadTensorRow(row_id_type row_id, const std::string &file, Tenso
return Status::OK();
}
Status AlbumOp::loadColumnData(const std::string &file, int32_t index, nlohmann::json js, TensorRow *row) {
int32_t i = index;
// special case to handle
if (data_schema_->column(i).name() == "id") {
// id is internal, special case to load from file
return LoadIDTensor(file, i, row);
}
// find if key does not exist, insert placeholder nullptr if not found
if (js.find(data_schema_->column(i).name()) == js.end()) {
// iterator not found, push nullptr as placeholder
MS_LOG(INFO) << "Pushing empty tensor for column: " << data_schema_->column(i).name() << ".";
return LoadEmptyTensor(i, row);
}
nlohmann::json column_value = js.at(data_schema_->column(i).name());
MS_LOG(INFO) << "This column is: " << data_schema_->column(i).name() << ".";
bool is_array = column_value.is_array();
// load single string
if (column_value.is_string() && data_schema_->column(i).type() == DataType::DE_STRING) {
return LoadStringTensor(column_value, i, row);
}
// load string array
if (is_array && data_schema_->column(i).type() == DataType::DE_STRING) {
return LoadStringArrayTensor(column_value, i, row);
}
// load image file
if (column_value.is_string() && data_schema_->column(i).type() != DataType::DE_STRING) {
std::string image_file_path = column_value;
return LoadImageTensor(image_file_path, i, row);
}
// load float value
bool judge_float = (data_schema_->column(i).type() == DataType::DE_FLOAT32) ||
(data_schema_->column(i).type() == DataType::DE_FLOAT64);
if (!is_array && judge_float) {
return LoadFloatTensor(column_value, i, row);
}
// load float array
if (is_array && judge_float) {
return LoadFloatArrayTensor(column_value, i, row);
}
// int value
if (!is_array &&
(data_schema_->column(i).type() == DataType::DE_INT64 || data_schema_->column(i).type() == DataType::DE_INT32)) {
return LoadIntTensor(column_value, i, row);
}
// int array
if (is_array &&
(data_schema_->column(i).type() == DataType::DE_INT64 || data_schema_->column(i).type() == DataType::DE_INT32)) {
return LoadIntArrayTensor(column_value, i, row);
} else {
MS_LOG(WARNING) << "Value type for column: " << data_schema_->column(i).name() << " is not supported.";
return Status::OK();
}
}
// Looping over LoadTensorRow to make 1 DataBuffer. 1 function call produces 1 buffer
Status AlbumOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db) {
std::unique_ptr<TensorQTable> deq = std::make_unique<TensorQTable>();
@ -587,7 +583,8 @@ Status AlbumOp::LaunchThreadsAndInitOp() {
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
// launch main workers that load DataBuffers by reading all images
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&AlbumOp::WorkerEntry, this, std::placeholders::_1)));
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&AlbumOp::WorkerEntry, this, std::placeholders::_1), "", id()));
TaskManager::FindMe()->Post();
RETURN_IF_NOT_OK(this->InitSampler()); // pass numRows to Sampler
return Status::OK();

View File

@ -134,7 +134,7 @@ class AlbumOp : public ParallelOp, public RandomAccessOp {
Status SanityCheck();
/// \brief The builder "build" method creates the final object.
/// \param[inout] std::shared_ptr<AlbumOp> *op - DatasetOp
/// \param[in, out] std::shared_ptr<AlbumOp> *op - DatasetOp
/// \return Status The status code returned
Status Build(std::shared_ptr<AlbumOp> *op);
@ -210,74 +210,82 @@ class AlbumOp : public ParallelOp, public RandomAccessOp {
/// \brief Load image to tensor row
/// \param[in] image_file Image name of file
/// \param[in] col_num Column num in schema
/// \param[inout] row Tensor row to push to
/// \param[in, out] row Tensor row to push to
/// \return Status The status code returned
Status LoadImageTensor(const std::string &image_file, uint32_t col_num, TensorRow *row);
/// \brief Load vector of ints to tensor, append tensor to tensor row
/// \param[in] json_obj Json object containing multi-dimensional label
/// \param[in] col_num Column num in schema
/// \param[inout] row Tensor row to push to
/// \param[in, out] row Tensor row to push to
/// \return Status The status code returned
Status LoadIntArrayTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorRow *row);
/// \brief Load vector of floatss to tensor, append tensor to tensor row
/// \param[in] json_obj Json object containing array data
/// \param[in] col_num Column num in schema
/// \param[inout] row Tensor row to push to
/// \param[in, out] row Tensor row to push to
/// \return Status The status code returned
Status LoadFloatArrayTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorRow *row);
/// \brief Load string array into a tensor, append tensor to tensor row
/// \param[in] json_obj Json object containing string tensor
/// \param[in] col_num Column num in schema
/// \param[inout] row Tensor row to push to
/// \param[in, out] row Tensor row to push to
/// \return Status The status code returned
Status LoadStringArrayTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorRow *row);
/// \brief Load string into a tensor, append tensor to tensor row
/// \param[in] json_obj Json object containing string tensor
/// \param[in] col_num Column num in schema
/// \param[inout] row Tensor row to push to
/// \param[in, out] row Tensor row to push to
/// \return Status The status code returned
Status LoadStringTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorRow *row);
/// \brief Load float value to tensor row
/// \param[in] json_obj Json object containing float
/// \param[in] col_num Column num in schema
/// \param[inout] row Tensor row to push to
/// \param[in, out] row Tensor row to push to
/// \return Status The status code returned
Status LoadFloatTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorRow *row);
/// \brief Load int value to tensor row
/// \param[in] json_obj Json object containing int
/// \param[in] col_num Column num in schema
/// \param[inout] row Tensor row to push to
/// \param[in, out] row Tensor row to push to
/// \return Status The status code returned
Status LoadIntTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorRow *row);
/// \brief Load emtpy tensor to tensor row
/// \brief Load empty tensor to tensor row
/// \param[in] col_num Column num in schema
/// \param[inout] row Tensor row to push to
/// \param[in, out] row Tensor row to push to
/// \return Status The status code returned
Status LoadEmptyTensor(uint32_t col_num, TensorRow *row);
/// \brief Load id from file name to tensor row
/// \param[in] file The file name to get ID from
/// \param[in] col_num Column num in schema
/// \param[inout] row Tensor row to push to
/// \param[in, out] row Tensor row to push to
/// \return Status The status code returned
Status LoadIDTensor(const std::string &file, uint32_t col_num, TensorRow *row);
/// \brief Load a tensor row according to a json file
/// \param[in] row_id_type row_id - id for this tensor row
/// \param[in] ImageColumns file Json file location
/// \param[inout] TensorRow row Json content stored into a tensor row
/// \param[in, out] TensorRow row Json content stored into a tensor row
/// \return Status The status code returned
Status LoadTensorRow(row_id_type row_id, const std::string &file, TensorRow *row);
/// \brief Load a tensor column according to a json file
/// \param[in] ImageColumns file Json file location
/// \param[in] index - certain column index
/// \param[in] js - json object
/// \param[in, out] TensorRow row Json content stored into a tensor row
/// \return Status The status code returned
Status loadColumnData(const std::string &file, int32_t index, nlohmann::json js, TensorRow *row);
/// \param[in] const std::vector<int64_t> &keys Keys in ioblock
/// \param[inout] std::unique_ptr<DataBuffer> db Databuffer to push to
/// \param[in, out] std::unique_ptr<DataBuffer> db Databuffer to push to
/// \return Status The status code returned
Status LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db);

View File

@ -102,8 +102,10 @@ Status CelebAOp::LaunchThreadsAndInitOp() {
RETURN_IF_NOT_OK(attr_info_queue_->Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Walking attr file", std::bind(&CelebAOp::ParseAttrFile, this)));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&CelebAOp::WorkerEntry, this, std::placeholders::_1)));
RETURN_IF_NOT_OK(
tree_->AllTasks()->CreateAsyncTask("Walking attr file", std::bind(&CelebAOp::ParseAttrFile, this), nullptr, id()));
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&CelebAOp::WorkerEntry, this, std::placeholders::_1), Name(), id()));
TaskManager::FindMe()->Post();
RETURN_IF_NOT_OK(ParseImageAttrInfo());
RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this));

View File

@ -165,9 +165,10 @@ Status CifarOp::LaunchThreadsAndInitOp() {
}
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(
"Get cifar data block", std::bind(&CifarOp::ReadCifarBlockDataAsync, this), nullptr, id()));
RETURN_IF_NOT_OK(
tree_->AllTasks()->CreateAsyncTask("Get cifar data block", std::bind(&CifarOp::ReadCifarBlockDataAsync, this)));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&CifarOp::WorkerEntry, this, std::placeholders::_1)));
tree_->LaunchWorkers(num_workers_, std::bind(&CifarOp::WorkerEntry, this, std::placeholders::_1), "", id()));
TaskManager::FindMe()->Post();
// The order of the following 2 functions must not be changed!
RETURN_IF_NOT_OK(ParseCifarData()); // Parse cifar data and get num rows, blocking

View File

@ -237,9 +237,10 @@ Status ClueOp::operator()() {
RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks()));
// launch one thread, responsible for filling IoBlockQueue
RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&ClueOp::WaitToFillIOBlockQueue, this)));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&ClueOp::WaitToFillIOBlockQueue, this), "", id()));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&ClueOp::WorkerEntry, this, std::placeholders::_1)));
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&ClueOp::WorkerEntry, this, std::placeholders::_1), "", id()));
// must be called after launching workers.
TaskManager::FindMe()->Post();

View File

@ -638,7 +638,8 @@ Status CocoOp::LaunchThreadsAndInitOp() {
}
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&CocoOp::WorkerEntry, this, std::placeholders::_1)));
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&CocoOp::WorkerEntry, this, std::placeholders::_1), "", id()));
TaskManager::FindMe()->Post();
RETURN_IF_NOT_OK(this->ParseAnnotationIds());
RETURN_IF_NOT_OK(this->InitSampler());

View File

@ -555,9 +555,10 @@ Status CsvOp::operator()() {
RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks()));
// launch one thread, responsible for filling IoBlockQueue
RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&CsvOp::WaitToFillIOBlockQueue, this)));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&CsvOp::WaitToFillIOBlockQueue, this), "", id()));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&CsvOp::WorkerEntry, this, std::placeholders::_1)));
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&CsvOp::WorkerEntry, this, std::placeholders::_1), "", id()));
// must be called after launching workers.
TaskManager::FindMe()->Post();

View File

@ -385,12 +385,13 @@ Status ImageFolderOp::LaunchThreadsAndInitOp() {
// 1) A thread that walks all folders and push the folder names to a util:Queue folder_name_queue_.
// 2) Workers that pull foldername from folder_name_queue_, walk it and return the sorted images to image_name_queue
// 3) Launch main workers that load DataBuffers by reading all images
RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("walk dir", std::bind(&ImageFolderOp::StartAsyncWalk, this)));
RETURN_IF_NOT_OK(
tree_->AllTasks()->CreateAsyncTask("walk dir", std::bind(&ImageFolderOp::StartAsyncWalk, this), nullptr, id()));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_,
std::bind(&ImageFolderOp::PrescanWorkerEntry, this, std::placeholders::_1),
Name() + "::PrescanWorkerEntry"));
Name() + "::PrescanWorkerEntry", id()));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(
num_workers_, std::bind(&ImageFolderOp::WorkerEntry, this, std::placeholders::_1), Name() + "::WorkerEntry"));
num_workers_, std::bind(&ImageFolderOp::WorkerEntry, this, std::placeholders::_1), Name() + "::WorkerEntry", id()));
TaskManager::FindMe()->Post();
// The order of the following 2 functions must not be changed!
RETURN_IF_NOT_OK(this->PrescanMasterEntry(folder_path_)); // Master thread of pre-scan workers, blocking

View File

@ -152,7 +152,7 @@ Status ManifestOp::LaunchThreadsAndInitOp() {
RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&ManifestOp::WorkerEntry, this, std::placeholders::_1)));
tree_->LaunchWorkers(num_workers_, std::bind(&ManifestOp::WorkerEntry, this, std::placeholders::_1), "", id()));
TaskManager::FindMe()->Post();
RETURN_IF_NOT_OK(ParseManifestFile());
RETURN_IF_NOT_OK(CountDatasetInfo());

View File

@ -446,7 +446,7 @@ Status MindRecordOp::LaunchThreadAndInitOp() {
}
// Launch main workers that load DataBuffers by reading all images
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&MindRecordOp::WorkerEntry, this, std::placeholders::_1)));
tree_->LaunchWorkers(num_workers_, std::bind(&MindRecordOp::WorkerEntry, this, std::placeholders::_1), "", id()));
TaskManager::FindMe()->Post();
return Status::OK();
}

View File

@ -420,7 +420,8 @@ Status MnistOp::LaunchThreadsAndInitOp() {
}
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&MnistOp::WorkerEntry, this, std::placeholders::_1)));
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&MnistOp::WorkerEntry, this, std::placeholders::_1), "", id()));
TaskManager::FindMe()->Post();
RETURN_IF_NOT_OK(this->WalkAllFiles());
RETURN_IF_NOT_OK(this->ParseMnistData());

View File

@ -197,7 +197,7 @@ Status RandomDataOp::operator()() {
// RandomDataOp doesn't need the master thread to stay around. Kick off the workers and then master exits.
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&RandomDataOp::WorkerEntry, this, std::placeholders::_1)));
tree_->LaunchWorkers(num_workers_, std::bind(&RandomDataOp::WorkerEntry, this, std::placeholders::_1), "", id()));
// required task group setup after launching workers
TaskManager::FindMe()->Post();

View File

@ -385,11 +385,11 @@ Status TextFileOp::operator()() {
RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks()));
// launch one thread, responsible for filling IoBlockQueue
RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&TextFileOp::WaitToFillIOBlockQueue, this)));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&TextFileOp::WaitToFillIOBlockQueue, this), Name(), id()));
// Read data from disk into buffers
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&TextFileOp::WorkerEntry, this, std::placeholders::_1)));
tree_->LaunchWorkers(num_workers_, std::bind(&TextFileOp::WorkerEntry, this, std::placeholders::_1), Name(), id()));
// must be called after launching workers.
TaskManager::FindMe()->Post();

View File

@ -234,12 +234,12 @@ Status TFReaderOp::operator()() {
RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks()));
// launch one thread, responsible for filling mIOBlockQueue
RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&TFReaderOp::WaitToFillIOBlockQueue, this)));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&TFReaderOp::WaitToFillIOBlockQueue, this), "", id()));
// launch num_workers_ worker threads, responsible for pulling from the IOBlockQueue and reading
// data from disk into buffers
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&TFReaderOp::WorkerEntry, this, std::placeholders::_1)));
tree_->LaunchWorkers(num_workers_, std::bind(&TFReaderOp::WorkerEntry, this, std::placeholders::_1), "", id()));
// must be called after launching workers. workers can't be spawned after this post,
// so workers have to be kept alive until the end of the program

View File

@ -366,6 +366,7 @@ Status VOCOp::ParseAnnotationBbox(const std::string &path) {
} else {
RETURN_STATUS_UNEXPECTED("Invalid data, bndbox dismatch in " + path);
}
if (label_name != "" && (class_index_.empty() || class_index_.find(label_name) != class_index_.end()) && xmin > 0 &&
ymin > 0 && xmax > xmin && ymax > ymin) {
std::vector<float> bbox_list = {xmin, ymin, xmax - xmin, ymax - ymin, difficult, truncated};
@ -389,7 +390,8 @@ Status VOCOp::LaunchThreadsAndInitOp() {
}
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&VOCOp::WorkerEntry, this, std::placeholders::_1)));
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&VOCOp::WorkerEntry, this, std::placeholders::_1), "", id()));
TaskManager::FindMe()->Post();
RETURN_IF_NOT_OK(this->ParseImageIds());
if (task_type_ == TaskType::Detection) {

View File

@ -208,7 +208,7 @@ Status ExecutionTree::Launch() {
// the launching tree/user thread. Do not exec any thread for an inlined op.
itr->state_ = DatasetOp::OpState::kDeOpRunning;
if (!itr->inlined()) {
RETURN_IF_NOT_OK(tg_->CreateAsyncTask(itr->NameWithID(), std::ref(*itr)));
RETURN_IF_NOT_OK(tg_->CreateAsyncTask(itr->NameWithID(), std::ref(*itr), nullptr, itr->id()));
// Set the state of the Operator as running. This only matters in Leaf ops, CacheOp and TakeOp
}
}
@ -237,7 +237,8 @@ ExecutionTree::Iterator::Iterator(const std::shared_ptr<DatasetOp> &root) : ind_
// Given the number of workers, launches the worker entry function for each. Essentially a
// wrapper for the TaskGroup handling that is stored inside the execution tree.
Status ExecutionTree::LaunchWorkers(int32_t num_workers, std::function<Status(uint32_t)> func, std::string name) {
Status ExecutionTree::LaunchWorkers(int32_t num_workers, std::function<Status(uint32_t)> func, std::string name,
int32_t operator_id) {
int32_t num_cpu_threads = GlobalContext::Instance()->config_manager()->num_cpu_threads();
// this performs check that num_workers is positive and not unreasonably large which could happen
// for example, un-initialized variable. uint16 max is 65536 which is large enough to cover everything
@ -249,7 +250,7 @@ Status ExecutionTree::LaunchWorkers(int32_t num_workers, std::function<Status(ui
<< std::to_string(num_cpu_threads) << ", the maximum number of threads on this CPU.";
}
for (int32_t i = 0; i < num_workers; ++i) {
RETURN_IF_NOT_OK(tg_->CreateAsyncTask(name, std::bind(func, i)));
RETURN_IF_NOT_OK(tg_->CreateAsyncTask(name, std::bind(func, i), nullptr, operator_id));
}
return Status::OK();
}

View File

@ -155,8 +155,11 @@ class ExecutionTree {
// wrapper for the TaskGroup handling that is stored inside the execution tree.
// @param num_workers - The number of workers to launch
// @param func - The function entry point that workers will execute
// @param name - The description of worker to launch
// @param op_id - The id of corresponding operator, if not inherit from dataset op then it is -1.
// @return Status The status code returned
Status LaunchWorkers(int32_t num_workers, std::function<Status(uint32_t)> func, std::string name = "");
Status LaunchWorkers(int32_t num_workers, std::function<Status(uint32_t)> func, std::string name = "",
int32_t operator_id = -1);
// Getter method
// @return shared_ptr to the root operator

View File

@ -5,4 +5,5 @@ add_library(engine-perf OBJECT
connector_size.cc
dataset_iterator_tracing.cc
connector_throughput.cc
cpu_sampling.cc
)

View File

@ -0,0 +1,567 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "minddata/dataset/engine/perf/cpu_sampling.h"
#if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
#include <sys/syscall.h>
#endif
#include <math.h>
#include <algorithm>
#include <cstdio>
#include <fstream>
#include <memory>
#include <string>
#include "minddata/dataset/api/python/pybind_conversion.h"
#include "minddata/dataset/core/config_manager.h"
#include "minddata/dataset/engine/execution_tree.h"
#include "minddata/dataset/util/path.h"
using json = nlohmann::json;
namespace mindspore {
namespace dataset {
bool BaseCpu::fetched_all_process_shared = false;
std::unordered_map<int32_t, std::vector<pid_t>> BaseCpu::op_process_shared = {};
#if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
#define USING_LINUX
#endif
BaseCpu::BaseCpu() {
pre_cpu_stat_.user_stat_ = 0;
pre_cpu_stat_.sys_stat_ = 0;
pre_cpu_stat_.io_stat_ = 0;
pre_cpu_stat_.idle_stat_ = 0;
pre_cpu_stat_.total_stat_ = 0;
}
Status DeviceCpu::ParseCpuInfo(const std::string &str) {
CpuStat cpu_stat;
uint64_t nice = 0;
uint64_t irq = 0;
uint64_t softirq = 0;
if (std::sscanf(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &cpu_stat.user_stat_, &nice, &cpu_stat.sys_stat_,
&cpu_stat.idle_stat_, &cpu_stat.io_stat_, &irq, &softirq) == EOF) {
return Status(StatusCode::kUnexpectedError, "Get device CPU failed.");
}
cpu_stat.total_stat_ =
cpu_stat.user_stat_ + nice + cpu_stat.sys_stat_ + cpu_stat.idle_stat_ + cpu_stat.io_stat_ + irq + softirq;
// Calculate the utilization from the second sampling
if (!first_collect_) {
CpuUtil info;
info.user_utilization_ = floor((cpu_stat.user_stat_ - pre_cpu_stat_.user_stat_) * 1.0 /
(cpu_stat.total_stat_ - pre_cpu_stat_.total_stat_) * 100 +
0.5);
info.sys_utilization_ = floor((cpu_stat.sys_stat_ - pre_cpu_stat_.sys_stat_) * 1.0 /
(cpu_stat.total_stat_ - pre_cpu_stat_.total_stat_) * 100 +
0.5);
info.io_utilization_ = floor((cpu_stat.io_stat_ - pre_cpu_stat_.io_stat_) * 1.0 /
(cpu_stat.total_stat_ - pre_cpu_stat_.total_stat_) * 100 +
0.5);
info.idle_utilization_ = floor((cpu_stat.idle_stat_ - pre_cpu_stat_.idle_stat_) * 1.0 /
(cpu_stat.total_stat_ - pre_cpu_stat_.total_stat_) * 100 +
0.5);
cpu_util_.emplace_back(info);
}
pre_cpu_stat_.user_stat_ = cpu_stat.user_stat_;
pre_cpu_stat_.sys_stat_ = cpu_stat.sys_stat_;
pre_cpu_stat_.io_stat_ = cpu_stat.io_stat_;
pre_cpu_stat_.idle_stat_ = cpu_stat.idle_stat_;
pre_cpu_stat_.total_stat_ = cpu_stat.total_stat_;
return Status::OK();
}
Status DeviceCpu::ParseCtxt(const std::string &str) {
uint64_t ctxt;
if (std::sscanf(str.c_str(), "%*s %lu", &ctxt) == EOF) {
return Status(StatusCode::kUnexpectedError, "Get context switch count failed.");
}
// Calculate the utilization from the second sampling
if (!first_collect_) {
context_switch_count_.push_back(ctxt - pre_context_switch_count_);
}
pre_context_switch_count_ = ctxt;
return Status::OK();
}
Status DeviceCpu::ParseRunningProcess(const std::string &str) {
uint32_t running_process;
if (std::sscanf(str.c_str(), "%*s %ud", &running_process) == EOF) {
return Status(StatusCode::kUnexpectedError, "Get context switch count failed.");
}
// Drop the first value in order to collect same amount of CPU utilization
if (!first_collect_) {
running_process_.push_back(running_process);
}
return Status::OK();
}
Status DeviceCpu::Collect(ExecutionTree *tree) {
std::ifstream file("/proc/stat");
if (!file.is_open()) {
MS_LOG(WARNING) << "Open CPU file failed when collect CPU information";
return Status::OK();
}
bool first_line = true;
std::string line;
while (getline(file, line)) {
if (first_line) {
first_line = false;
RETURN_IF_NOT_OK(ParseCpuInfo(line));
}
if (line.find("ctxt") != std::string::npos) {
RETURN_IF_NOT_OK(ParseCtxt(line));
}
if (line.find("procs_running") != std::string::npos) {
RETURN_IF_NOT_OK(ParseRunningProcess(line));
}
}
file.close();
first_collect_ = false;
return Status::OK();
}
Status DeviceCpu::SaveToFile(const std::string &file_path) {
Path path = Path(file_path);
json output;
if (path.Exists()) {
MS_LOG(DEBUG) << file_path << " exists already";
std::ifstream file(file_path);
file >> output;
} else {
output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
}
std::vector<int8_t> user_util;
std::transform(cpu_util_.begin(), cpu_util_.end(), std::back_inserter(user_util),
[&](const CpuUtil &info) { return info.user_utilization_; });
std::vector<int8_t> sys_util;
std::transform(cpu_util_.begin(), cpu_util_.end(), std::back_inserter(sys_util),
[&](const CpuUtil &info) { return info.sys_utilization_; });
std::vector<int8_t> io_util;
std::transform(cpu_util_.begin(), cpu_util_.end(), std::back_inserter(io_util),
[&](const CpuUtil &info) { return info.io_utilization_; });
std::vector<int8_t> idle_util;
std::transform(cpu_util_.begin(), cpu_util_.end(), std::back_inserter(idle_util),
[&](const CpuUtil &info) { return info.idle_utilization_; });
output["device_info"] = {{"user_utilization", user_util},
{"sys_utilization", sys_util},
{"io_utilization", io_util},
{"idle_utilization", idle_util},
{"runable_processes", running_process_},
{"context_switch_count", context_switch_count_}};
// Discard the content of the file when opening.
std::ofstream os(file_path, std::ios::trunc);
os << output;
MS_LOG(INFO) << "Save device CPU success.";
return Status::OK();
}
Status OperatorCpu::ParseCpuInfo(int32_t op_id, int64_t thread_id,
std::unordered_map<int32_t, std::unordered_map<int64_t, CpuOpStat>> *op_stat) {
pid_t pid = 0;
#if defined(USING_LINUX)
pid = syscall(SYS_getpid);
#endif
std::string stat_path = "/proc/" + std::to_string(pid) + "/task/" + std::to_string(thread_id) + "/stat";
// Judge whether file exist first
Path temp_path(stat_path);
if (!temp_path.Exists()) {
(*op_stat)[op_id][thread_id].user_stat_ = 0;
(*op_stat)[op_id][thread_id].sys_stat_ = 0;
return Status(StatusCode::kFileNotExist);
}
std::ifstream file(stat_path);
if (!file.is_open()) {
MS_LOG(WARNING) << "Open CPU file failed when collect CPU information";
return Status::OK();
}
std::string str;
getline(file, str);
uint64_t utime;
uint64_t stime;
if (std::sscanf(str.c_str(), "%*d %*s %*s %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %lu %lu", &utime,
&stime) == EOF) {
file.close();
return Status(StatusCode::kUnexpectedError, "Get device CPU failed.");
}
file.close();
(*op_stat)[op_id][thread_id].user_stat_ = utime;
(*op_stat)[op_id][thread_id].sys_stat_ = stime;
return Status::OK();
}
Status OperatorCpu::GetTotalCpuTime(uint64_t *total_stat) {
std::ifstream file("/proc/stat");
if (!file.is_open()) {
MS_LOG(WARNING) << "Open CPU file failed when collect CPU information";
return Status::OK();
}
std::string str;
getline(file, str);
uint64_t user = 0, sys = 0, idle = 0, iowait = 0, nice = 0, irq = 0, softirq = 0;
if (std::sscanf(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &user, &nice, &sys, &idle, &iowait, &irq, &softirq) ==
EOF) {
file.close();
return Status(StatusCode::kUnexpectedError, "Get device CPU failed.");
}
file.close();
*total_stat = user + nice + sys + idle + iowait + irq + softirq;
return Status::OK();
}
Status OperatorCpu::Collect(ExecutionTree *tree) {
if (first_collect_) {
for (auto iter = tree->begin(); iter != tree->end(); ++iter) {
id_count++;
}
#if defined(USING_LINUX)
cpu_processor_num = get_nprocs_conf();
#endif
}
// Obtain the op and thread mapping
op_thread.clear();
List<Task> allTasks = tree->AllTasks()->GetTask();
for (auto &task1 : allTasks) {
int32_t op_id = task1.get_operator_id();
op_thread[op_id].emplace_back(task1.get_linux_id());
}
// add process id into op_thread
if (!fetched_all_process) {
{
py::gil_scoped_acquire gil_acquire;
py::module ds = py::module::import("mindspore.dataset.engine.datasets");
py::tuple process_info = ds.attr("_get_operator_process")();
py::dict sub_process = py::reinterpret_borrow<py::dict>(process_info[0]);
fetched_all_process = py::reinterpret_borrow<py::bool_>(process_info[1]);
// parse dict value
op_process = toIntMap(sub_process);
BaseCpu::op_process_shared = op_process;
BaseCpu::fetched_all_process_shared = fetched_all_process;
}
// judge whether there is device_que operator, if so operator id may need increase by one, temp use directly
for (auto item : op_process) {
if (!item.second.empty()) {
if (op_thread.find(item.first) != op_thread.end()) {
op_thread[item.first].insert(op_thread[item.first].end(), item.second.begin(), item.second.end());
} else {
op_thread[item.first] = item.second;
}
}
}
}
uint64_t total_stat_;
RETURN_IF_NOT_OK(GetTotalCpuTime(&total_stat_));
std::vector<CpuOpUtil> cpu_step_util_;
std::unordered_map<int32_t, std::unordered_map<int64_t, CpuOpStat>> op_stat_;
if (!first_collect_) {
// obtain all the op id in current tasks
std::vector<int32_t> total_op_id;
for (auto iter = op_thread.begin(); iter != op_thread.end(); iter++) {
total_op_id.emplace_back(iter->first);
}
// iter all the op, and obtain the CPU utilization of each operator
for (auto op_id = -1; op_id < id_count; op_id++) {
float user_util = 0, sys_util = 0;
auto iter = std::find(total_op_id.begin(), total_op_id.end(), op_id);
if (iter != total_op_id.end()) {
for (auto thread_id : op_thread[op_id]) {
if (ParseCpuInfo(op_id, thread_id, &op_stat_) == Status::OK()) {
user_util += (op_stat_[op_id][thread_id].user_stat_ - pre_op_stat_[op_id][thread_id].user_stat_) * 1.0 /
(total_stat_ - pre_total_stat_) * 100;
sys_util += (op_stat_[op_id][thread_id].sys_stat_ - pre_op_stat_[op_id][thread_id].sys_stat_) * 1.0 /
(total_stat_ - pre_total_stat_) * 100;
}
}
}
CpuOpUtil info;
info.op_id = op_id;
info.sys_utilization_ = sys_util;
info.user_utilization_ = user_util;
cpu_step_util_.emplace_back(info);
}
cpu_op_util_.emplace_back(cpu_step_util_);
} else {
// mainly obtain the init CPU execute time in first collect
for (auto iter = op_thread.begin(); iter != op_thread.end(); iter++) {
int32_t op_id = iter->first;
for (auto thread_id : iter->second) {
ParseCpuInfo(op_id, thread_id, &op_stat_);
}
}
}
// copy current op_stat into pre_op_stat
pre_op_stat_ = op_stat_;
pre_total_stat_ = total_stat_;
first_collect_ = false;
return Status::OK();
}
Status OperatorCpu::SaveToFile(const std::string &file_path) {
Path path = Path(file_path);
json output;
if (path.Exists()) {
MS_LOG(DEBUG) << file_path << "already exist.";
std::ifstream file(file_path);
file >> output;
}
uint8_t index = 0;
json OpWriter;
for (auto op_id = -1; op_id < id_count; op_id++) {
std::vector<uint16_t> user_util;
std::vector<uint16_t> sys_util;
std::transform(
cpu_op_util_.begin(), cpu_op_util_.end(), std::back_inserter(user_util),
[&](const std::vector<CpuOpUtil> &info) { return int16_t(info[index].user_utilization_ * cpu_processor_num); });
std::transform(
cpu_op_util_.begin(), cpu_op_util_.end(), std::back_inserter(sys_util),
[&](const std::vector<CpuOpUtil> &info) { return int16_t(info[index].sys_utilization_ * cpu_processor_num); });
json per_op_info = {{"metrics", {{"user_utilization", user_util}, {"sys_utilization", sys_util}}},
{"op_id", op_id}};
OpWriter.emplace_back(per_op_info);
index++;
}
output["op_info"] = OpWriter;
// Discard the content of the file when opening.
std::ofstream os(file_path, std::ios::trunc);
os << output;
MS_LOG(INFO) << "Save device CPU success.";
return Status::OK();
}
Status ProcessCpu::ParseCpuInfo() {
uint64_t total_stat_;
RETURN_IF_NOT_OK(GetTotalCpuTime(&total_stat_));
if (!pre_fetched_state) {
process_id.clear();
pid_t main_pid = 0;
#if defined(USING_LINUX)
main_pid = syscall(SYS_getpid);
#endif
process_id.emplace_back(main_pid);
op_process = BaseCpu::op_process_shared;
fetched_all_process = BaseCpu::fetched_all_process_shared;
for (auto item : op_process) {
for (auto id : item.second) {
process_id.emplace_back(id);
}
}
}
float user_util = 0, sys_util = 0;
for (auto pid : process_id) {
std::string stat_path = "/proc/" + std::to_string(pid) + "/stat";
std::ifstream file(stat_path);
if (!file.is_open()) {
MS_LOG(WARNING) << "Open CPU file failed when collect CPU information";
continue;
}
std::string str;
getline(file, str);
uint64_t user = 0, sys = 0;
if (std::sscanf(str.c_str(), "%*d %*s %*s %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %lu %lu", &user,
&sys) == EOF) {
file.close();
return Status(StatusCode::kUnexpectedError, "Get device CPU failed.");
}
file.close();
// Calculate the utilization from the second sampling
if (!first_collect_ && (pre_process_stat_.find(pid) != pre_process_stat_.end())) {
user_util += (user - pre_process_stat_[pid].user_stat_) * 1.0 / (total_stat_ - pre_total_stat_) * 100;
sys_util += (sys - pre_process_stat_[pid].sys_stat_) * 1.0 / (total_stat_ - pre_total_stat_) * 100;
}
pre_process_stat_[pid].user_stat_ = user;
pre_process_stat_[pid].sys_stat_ = sys;
}
if (!first_collect_) {
CpuProcessUtil info;
info.user_utilization_ = user_util;
info.sys_utilization_ = sys_util;
process_util_.emplace_back(info);
}
pre_total_stat_ = total_stat_;
first_collect_ = false;
pre_fetched_state = fetched_all_process;
return Status::OK();
}
Status ProcessCpu::GetTotalCpuTime(uint64_t *total_stat) {
std::ifstream file("/proc/stat");
if (!file.is_open()) {
MS_LOG(WARNING) << "Open CPU file failed when collect CPU information";
return Status::OK();
}
std::string str;
getline(file, str);
uint64_t user = 0, sys = 0, idle = 0, iowait = 0, nice = 0, irq = 0, softirq = 0;
if (std::sscanf(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &user, &nice, &sys, &idle, &iowait, &irq, &softirq) ==
EOF) {
file.close();
return Status(StatusCode::kUnexpectedError, "Get device CPU failed.");
}
file.close();
*total_stat = user + nice + sys + idle + iowait + irq + softirq;
return Status::OK();
}
Status ProcessCpu::Collect(ExecutionTree *tree) {
if (first_collect_) {
#if defined(USING_LINUX)
cpu_processor_num = get_nprocs_conf();
#endif
}
RETURN_IF_NOT_OK(ParseCpuInfo());
return Status::OK();
}
Status ProcessCpu::SaveToFile(const std::string &file_path) {
Path path = Path(file_path);
json output;
if (path.Exists()) {
MS_LOG(DEBUG) << file_path << "already exist.";
std::ifstream file(file_path);
file >> output;
} else {
output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
}
std::vector<int16_t> user_util;
std::transform(process_util_.begin(), process_util_.end(), std::back_inserter(user_util),
[&](const CpuProcessUtil &info) { return uint16_t(info.user_utilization_ * cpu_processor_num); });
std::vector<int16_t> sys_util;
std::transform(process_util_.begin(), process_util_.end(), std::back_inserter(sys_util),
[&](const CpuProcessUtil &info) { return uint16_t(info.sys_utilization_ * cpu_processor_num); });
output["process_info"] = {{"user_utilization", user_util}, {"sys_utilization", sys_util}};
output["cpu_processor_num"] = cpu_processor_num;
// Discard the content of the file when opening.
std::ofstream os(file_path, std::ios::trunc);
os << output;
MS_LOG(INFO) << "Save process CPU success.";
return Status::OK();
}
Status CpuSampling::CollectTimeStamp() {
time_stamp_.emplace_back(ProfilingTime::GetCurMilliSecond());
return Status::OK();
}
// Sample action
Status CpuSampling::Sample() {
// Collect cpu information
for (auto cpu : cpu_) {
RETURN_IF_NOT_OK(cpu->Collect(this->tree_));
}
// Collect time stamp
RETURN_IF_NOT_OK(CollectTimeStamp());
return Status::OK();
}
Status CpuSampling::SaveTimeStampToFile() {
// Save time stamp to json file
// If the file is already exist, simply add the data to corresponding field.
Path path = Path(file_path_);
json output;
if (path.Exists()) {
std::ifstream file(file_path_);
file >> output;
}
output["time_stamp"] = time_stamp_;
std::ofstream os(file_path_, std::ios::trunc);
os << output;
return Status::OK();
}
Status CpuSampling::SaveSamplingItervalToFile() {
// If the file is already exist, simply add the data to corresponding field.
Path path = Path(file_path_);
json output;
if (path.Exists()) {
std::ifstream file(file_path_);
file >> output;
}
output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
std::ofstream os(file_path_, std::ios::trunc);
os << output;
return Status::OK();
}
// Save profiling data to file
Status CpuSampling::SaveToFile() {
// Save time stamp to json file
RETURN_IF_NOT_OK(SaveTimeStampToFile());
// Save time stamp to json file
RETURN_IF_NOT_OK(SaveSamplingItervalToFile());
// Save cpu information to json file
for (auto cpu : cpu_) {
RETURN_IF_NOT_OK(cpu->SaveToFile(file_path_));
}
return Status::OK();
}
Status CpuSampling::Init(const std::string &dir_path, const std::string &device_id) {
file_path_ = (Path(dir_path) / Path("minddata_cpu_utilization_" + device_id + ".json")).toString();
std::shared_ptr<DeviceCpu> device_cpu = std::make_shared<DeviceCpu>();
std::shared_ptr<OperatorCpu> operator_cpu = std::make_shared<OperatorCpu>();
std::shared_ptr<ProcessCpu> process_cpu = std::make_shared<ProcessCpu>();
cpu_.push_back(device_cpu);
cpu_.push_back(operator_cpu);
cpu_.push_back(process_cpu);
return Status::OK();
}
Status CpuSampling::ChangeFileMode() {
if (chmod(common::SafeCStr(file_path_), S_IRUSR | S_IWUSR) == -1) {
std::string err_str = "Change file mode failed," + file_path_;
return Status(StatusCode::kUnexpectedError, err_str);
}
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -0,0 +1,201 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_CPU_SAMPLING_H
#define MINDSPORE_CCSRC_MINDDATA_DATASET_CPU_SAMPLING_H
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include <nlohmann/json.hpp>
#include "minddata/dataset/engine/perf/profiling.h"
#include "minddata/dataset/engine/datasetops/dataset_op.h"
namespace mindspore {
namespace dataset {
class ExecutionTree;
// CPU information from /proc/stat or /proc/pid/stat file
typedef struct CpuStat_s {
uint64_t user_stat_;
uint64_t sys_stat_;
uint64_t io_stat_;
uint64_t idle_stat_;
uint64_t total_stat_;
} CpuStat;
// Cpu utilization
typedef struct CpuInfo_s {
uint8_t user_utilization_;
uint8_t sys_utilization_;
uint8_t io_utilization_;
uint8_t idle_utilization_;
} CpuUtil;
// CPU utilization of operator
typedef struct CpuOpInfo_s {
float user_utilization_;
float sys_utilization_;
int32_t op_id;
} CpuOpUtil;
// CPU utilization of process
typedef struct CpuProcessInfo_s {
float user_utilization_;
float sys_utilization_;
} CpuProcessUtil;
// CPU stat of operator
typedef struct CpuOpStat_s {
uint64_t user_stat_;
uint64_t sys_stat_;
} CpuOpStat;
class BaseCpu {
public:
BaseCpu();
~BaseCpu() = default;
// Collect CPU information
virtual Status Collect(ExecutionTree *tree) = 0;
virtual Status SaveToFile(const std::string &file_path) = 0;
protected:
std::vector<CpuUtil> cpu_util_;
CpuStat pre_cpu_stat_;
static bool fetched_all_process_shared;
static std::unordered_map<int32_t, std::vector<pid_t>> op_process_shared;
bool fetched_all_process = false;
bool pre_fetched_state = false;
std::unordered_map<int32_t, std::vector<pid_t>> op_process;
int32_t cpu_processor_num;
};
// Collect device CPU information
class DeviceCpu : public BaseCpu {
public:
DeviceCpu() : pre_running_process_(0), pre_context_switch_count_(0), first_collect_(true) {}
~DeviceCpu() = default;
Status Collect(ExecutionTree *tree) override;
Status SaveToFile(const std::string &file_path) override;
private:
// Get CPU information, include use/sys/idle/io utilization
Status ParseCpuInfo(const std::string &str);
// Get context switch count
Status ParseCtxt(const std::string &str);
// Get running process count
Status ParseRunningProcess(const std::string &str);
std::vector<uint32_t> running_process_;
std::vector<uint64_t> context_switch_count_;
uint32_t pre_running_process_;
uint64_t pre_context_switch_count_;
bool first_collect_;
};
// Collect operator CPU information
class OperatorCpu : public BaseCpu {
public:
OperatorCpu() : first_collect_(true) {}
~OperatorCpu() = default;
Status Collect(ExecutionTree *tree) override;
Status SaveToFile(const std::string &file_path) override;
private:
// Get cpu information, include use/sys/idle/io utilization
Status ParseCpuInfo(int32_t op_id, int64_t thread_id,
std::unordered_map<int32_t, std::unordered_map<int64_t, CpuOpStat>> *op_stat);
// Get the total CPU time of device
Status GetTotalCpuTime(uint64_t *total_stat);
// Store the CPU utilization of each operator
std::vector<std::vector<CpuOpUtil>> cpu_op_util_;
bool first_collect_;
// Store the id and its corresponding threads.
std::unordered_map<int32_t, std::vector<pid_t>> op_thread;
std::unordered_map<int32_t, std::unordered_map<int64_t, CpuOpStat>> pre_op_stat_;
uint64_t pre_total_stat_;
int32_t id_count = 0;
};
// Collect operator CPU information
class ProcessCpu : public BaseCpu {
public:
ProcessCpu() : first_collect_(true) {}
~ProcessCpu() = default;
Status Collect(ExecutionTree *tree) override;
Status SaveToFile(const std::string &file_path) override;
private:
// Get CPU information, include use/sys/idle/io utilization
Status ParseCpuInfo();
// Get the total CPU time of device
Status GetTotalCpuTime(uint64_t *total_stat);
bool first_collect_;
std::vector<CpuProcessUtil> process_util_;
uint64_t pre_total_stat_;
std::unordered_map<int64_t, CpuOpStat> pre_process_stat_;
std::vector<pid_t> process_id;
};
// Sampling CPU information
// It support JSON serialization for external usage.
class CpuSampling : public Sampling {
using TimeStamp = std::vector<uint32_t>;
public:
explicit CpuSampling(ExecutionTree *tree) : tree_(tree) {}
~CpuSampling() = default;
// Driver function for CPU sampling.
// This function samples the CPU information of device/process/op
Status Sample() override;
std::string Name() const override { return kCpuSamplingName; }
// Save sampling data to file
// @return Status - The error code return
Status SaveToFile() override;
Status Init(const std::string &dir_path, const std::string &device_id) override;
// Change file mode after save CPU data
Status ChangeFileMode() override;
private:
Status CollectTimeStamp();
Status SaveTimeStampToFile();
Status SaveSamplingItervalToFile();
ExecutionTree *tree_ = nullptr; // ExecutionTree pointer
std::vector<std::shared_ptr<BaseCpu>> cpu_; // CPU information of device/process/op
TimeStamp time_stamp_; // Time stamp
};
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_CPU_SAMPLING_H

View File

@ -24,7 +24,7 @@ namespace mindspore {
namespace dataset {
Status DatasetIteratorTracing::Record(const int32_t type, const int32_t extra_info, const int32_t batch_num,
const int32_t value) {
const int32_t value, const uint64_t time_stamp) {
// Format: "type extra-info batch-num value"
// type: 0: time, 1: connector size
// extra-info: if type is 0 - 0: pipeline time, 1: push tdt time, 2: batch time
@ -36,7 +36,7 @@ Status DatasetIteratorTracing::Record(const int32_t type, const int32_t extra_in
// 0 0 20 10 - The 20th batch took 10ms to get data from pipeline.
// 1 64 20 5 - Connector size is 5 when get the 20th batch.Connector capacity is 64.
std::string data = std::to_string(type) + " " + std::to_string(extra_info) + " " + std::to_string(batch_num) + " " +
std::to_string(value);
std::to_string(value) + " " + std::to_string(time_stamp);
value_.emplace_back(data);
return Status::OK();
}

View File

@ -33,7 +33,8 @@ class DatasetIteratorTracing : public Tracing {
// Record tracing data
// @return Status The status code returned
Status Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value);
Status Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value,
const uint64_t time_stamp);
std::string Name() const override { return kDatasetIteratorTracingName; };

View File

@ -24,7 +24,7 @@ namespace mindspore {
namespace dataset {
Status DeviceQueueTracing::Record(const int32_t type, const int32_t extra_info, const int32_t batch_num,
const int32_t value) {
const int32_t value, const uint64_t time_stamp) {
// Format: "type extra-info batch-num value"
// type: 0: time, 1: connector size
// extra-info: if type is 0 - 0: pipeline time, 1: push tdt time, 2: batch time
@ -32,11 +32,12 @@ Status DeviceQueueTracing::Record(const int32_t type, const int32_t extra_info,
// batch-num: batch number
// value: if type is 0 - value is time(ms)
// if type is 1 - value is connector size
// time-stamp: time stamp
// Examples:
// 0 0 20 10 - The 20th batch took 10ms to get data from pipeline.
// 1 64 20 5 - Connector size is 5 when get the 20th batch.Connector capacity is 64.
// 0 0 20 10 xxx- The 20th batch took 10ms to get data from pipeline.
// 1 64 20 5 xxx- Connector size is 5 when get the 20th batch.Connector capacity is 64.
std::string data = std::to_string(type) + " " + std::to_string(extra_info) + " " + std::to_string(batch_num) + " " +
std::to_string(value);
std::to_string(value) + " " + std::to_string(time_stamp);
value_.emplace_back(data);
return Status::OK();
}

View File

@ -33,7 +33,8 @@ class DeviceQueueTracing : public Tracing {
// Record tracing data
// @return Status The status code returned
Status Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value);
Status Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value,
const uint64_t time_stamp);
std::string Name() const override { return kDeviceQueueTracingName; };

View File

@ -24,6 +24,7 @@
#include "minddata/dataset/engine/perf/device_queue_tracing.h"
#include "minddata/dataset/engine/perf/connector_size.h"
#include "minddata/dataset/engine/perf/connector_throughput.h"
#include "minddata/dataset/engine/perf/cpu_sampling.h"
#include "minddata/dataset/engine/perf/dataset_iterator_tracing.h"
#include "minddata/dataset/util/log_adapter.h"
@ -79,6 +80,9 @@ Status ProfilingManager::Initialize() {
std::shared_ptr<Sampling> connector_thr_sampling = std::make_shared<ConnectorThroughput>(tree_);
RETURN_IF_NOT_OK(RegisterSamplingNode(connector_thr_sampling));
std::shared_ptr<Sampling> cpu_sampling = std::make_shared<CpuSampling>(tree_);
RETURN_IF_NOT_OK(RegisterSamplingNode(cpu_sampling));
return Status::OK();
}
@ -168,7 +172,7 @@ Status ProfilingManager::ChangeFileMode() {
return Status::OK();
}
int64_t ProfilingTime::GetCurMilliSecond() {
uint64_t ProfilingTime::GetCurMilliSecond() {
// because cpplint does not allow using namespace
using std::chrono::duration_cast;
using std::chrono::milliseconds;

View File

@ -33,6 +33,7 @@ const char kDeviceQueueTracingName[] = "Device_Queue_Tracing";
const char kDatasetIteratorTracingName[] = "Dataset_Iterator_Tracing";
const char kConnectorSizeSamplingName[] = "Connector_Size_Sampling";
const char kConnectorThroughputSamplingName[] = "Connector_Throughput_Sampling";
const char kCpuSamplingName[] = "Cpu_Sampling";
// Profiling is a class of basic unit of profiling action
// This base class encapsulate the serialization output logic
@ -150,7 +151,7 @@ enum ProfilingTimeSubType {
class ProfilingTime {
public:
static int64_t GetCurMilliSecond();
static uint64_t GetCurMilliSecond();
};
} // namespace dataset
} // namespace mindspore

View File

@ -246,8 +246,10 @@ Status TreeAdapter::GetNext(TensorRow *row) {
RETURN_IF_NOT_OK(cur_db_->PopRow(row));
// Record profiling info
if (tracing_ != nullptr) {
uint64_t end_time = ProfilingTime::GetCurMilliSecond();
cur_batch_num_++;
RETURN_IF_NOT_OK(tracing_->Record(CONNECTOR_DEPTH, cur_connector_capacity_, cur_batch_num_, cur_connector_size_));
RETURN_IF_NOT_OK(
tracing_->Record(CONNECTOR_DEPTH, cur_connector_capacity_, cur_batch_num_, cur_connector_size_, end_time));
}
return Status::OK();
}

View File

@ -44,8 +44,8 @@ void Task::operator()() {
#if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
native_handle_ = pthread_self();
thread_id_ = syscall(SYS_gettid);
#endif
try {
// Previously there is a timing hole where the thread is spawn but hit error immediately before we can set
// the TaskGroup pointer and register. We move the registration logic to here (after we spawn) so we can
@ -105,8 +105,9 @@ Status Task::GetTaskErrorIfAny() const {
}
}
Task::Task(const std::string &myName, const std::function<Status()> &f)
Task::Task(const std::string &myName, const std::function<Status()> &f, int32_t operator_id)
: my_name_(myName),
operator_id_(operator_id),
rc_(),
fnc_obj_(f),
task_group_(nullptr),

View File

@ -18,6 +18,7 @@
#if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
#include <pthread.h>
#include <sys/syscall.h>
#endif
#include <chrono>
#include <exception>
@ -48,7 +49,7 @@ class Task : public IntrpResource {
enum class WaitFlag : int { kBlocking, kNonBlocking };
Task(const std::string &myName, const std::function<Status()> &f);
Task(const std::string &myName, const std::function<Status()> &f, int32_t operator_id = -1);
// Future objects are not copyable.
Task(const Task &) = delete;
@ -87,8 +88,12 @@ class Task : public IntrpResource {
std::thread::id get_id() { return id_; }
pid_t get_linux_id() { return thread_id_; }
std::string MyName() const { return my_name_; }
int32_t get_operator_id() { return operator_id_; }
// An operator used by std::find
bool operator==(const Task &other) const { return (this == &other); }
@ -107,6 +112,8 @@ class Task : public IntrpResource {
private:
mutable std::mutex mux_;
std::string my_name_;
int32_t operator_id_;
pid_t thread_id_;
Status rc_;
WaitPost wp_;
// Task need to provide definition for this function. It

View File

@ -25,7 +25,7 @@ TaskManager *TaskManager::instance_ = nullptr;
std::once_flag TaskManager::init_instance_flag_;
// This takes the same parameter as Task constructor.
Status TaskManager::CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, TaskGroup *vg,
Task **task) {
Task **task, int32_t operator_id) {
// We need to block destructor coming otherwise we will deadlock. We will grab the
// stateLock in shared allowing CreateAsyncTask to run concurrently.
SharedLock stateLck(&state_lock_);
@ -33,7 +33,7 @@ Status TaskManager::CreateAsyncTask(const std::string &my_name, const std::funct
if (ServiceState() == STATE::kStopInProg || ServiceState() == STATE::kStopped) {
return Status(StatusCode::kInterrupted, __LINE__, __FILE__, "TaskManager is shutting down");
}
RETURN_IF_NOT_OK(GetFreeTask(my_name, f, task));
RETURN_IF_NOT_OK(GetFreeTask(my_name, f, task, operator_id));
if (vg == nullptr) {
RETURN_STATUS_UNEXPECTED("TaskGroup is null");
}
@ -248,7 +248,8 @@ void TaskManager::ReturnFreeTask(Task *p) noexcept {
}
}
Status TaskManager::GetFreeTask(const std::string &my_name, const std::function<Status()> &f, Task **p) {
Status TaskManager::GetFreeTask(const std::string &my_name, const std::function<Status()> &f, Task **p,
int32_t operator_id) {
if (p == nullptr) {
RETURN_STATUS_UNEXPECTED("p is null");
}
@ -262,18 +263,19 @@ Status TaskManager::GetFreeTask(const std::string &my_name, const std::function<
}
}
if (q) {
new (q) Task(my_name, f);
new (q) Task(my_name, f, operator_id);
} else {
std::shared_ptr<MemoryPool> mp = Services::GetInstance().GetServiceMemPool();
Status rc;
q = new (&rc, mp) Task(my_name, f);
q = new (&rc, mp) Task(my_name, f, operator_id);
RETURN_IF_NOT_OK(rc);
}
*p = q;
return Status::OK();
}
Status TaskGroup::CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, Task **ppTask) {
Status TaskGroup::CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, Task **ppTask,
int32_t operator_id) {
auto pMytask = TaskManager::FindMe();
// We need to block ~TaskGroup coming otherwise we will deadlock. We will grab the
// stateLock in shared allowing CreateAsyncTask to run concurrently.
@ -293,7 +295,7 @@ Status TaskGroup::CreateAsyncTask(const std::string &my_name, const std::functio
return pMytask->IsMasterThread() ? rc_ : Status(StatusCode::kInterrupted);
}
}
RETURN_IF_NOT_OK(dm.CreateAsyncTask(my_name, f, this, &pTask));
RETURN_IF_NOT_OK(dm.CreateAsyncTask(my_name, f, this, &pTask, operator_id));
if (ppTask) {
*ppTask = pTask;
}

View File

@ -75,7 +75,8 @@ class TaskManager : public Service {
// API
// This takes the same parameter as Task constructor. Take a look
// of the test-thread.cc for usage.
Status CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, TaskGroup *vg, Task **);
Status CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, TaskGroup *vg, Task **,
int32_t operator_id = -1);
// Same usage as boot thread group
Status join_all();
@ -100,7 +101,7 @@ class TaskManager : public Service {
void ReturnFreeTask(Task *p) noexcept;
Status GetFreeTask(const std::string &my_name, const std::function<Status()> &f, Task **p);
Status GetFreeTask(const std::string &my_name, const std::function<Status()> &f, Task **p, int32_t operator_id = -1);
Status WatchDog();
@ -129,7 +130,8 @@ class TaskGroup : public Service {
friend class Task;
friend class TaskManager;
Status CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, Task **pTask = nullptr);
Status CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, Task **pTask = nullptr,
int32_t operator_id = -1);
void interrupt_all() noexcept;
@ -137,6 +139,8 @@ class TaskGroup : public Service {
int size() const noexcept { return grp_list_.count; }
List<Task> GetTask() const noexcept { return grp_list_; }
Status DoServiceStart() override { return Status::OK(); }
Status DoServiceStop() override;

View File

@ -107,6 +107,25 @@ def zip(datasets):
return ZipDataset(datasets)
def _get_operator_process():
"""
Inner implemented method, mainly for passing sub-process id in C layer
Returns:
dict, mapping dict of operator id and corresponding process id.
"""
global _OP_PROCESS
process_info = _OP_PROCESS
op_process = dict()
keys = process_info.keys()
fetched_all = True
for key in keys:
op_process[key] = list(process_info[key][1])
item_full = (len(process_info[key][1]) == process_info[key][0])
fetched_all = fetched_all and item_full
return op_process, fetched_all
class Dataset:
"""
Abstract class to represent a dataset in DataEngine's data pipeline.
@ -156,11 +175,47 @@ class Dataset:
parent = self.parent
self.parent = []
dataset = copy.deepcopy(self)
global _OP_NAME
_OP_NAME = Dataset._get_operator_id(dataset)
ir_tree = dataset.parse_tree()
self.parent = parent
_init_device_info()
return ir_tree, dataset
@staticmethod
def _get_operator_id(dataset):
"""
Internal method to iterate the tree and obtain op_id of each operator.
Returns:
Dataset, the root dataset of the tree.
"""
op_name = dict()
generator_process = dict()
op_name[str(dataset)] = 0
op_id = 1
def process_name(datasets, operator_id):
if not datasets:
return 0
temp = []
for item in datasets:
for d in item.children:
temp.append(d)
op_name[str(d)] = operator_id
if isinstance(d, GeneratorDataset) and d.sample_fn:
if d.sample_fn.pid:
generator_process[operator_id] = [d.num_parallel_workers, set(d.sample_fn.pid)]
operator_id = operator_id + 1
return process_name(temp, operator_id)
process_name([dataset], op_id)
if generator_process:
global _OP_PROCESS
_OP_PROCESS.update(generator_process)
return op_name
def parse_tree(self):
"""
Internal method to parse the API tree into an IR tree.
@ -2202,7 +2257,8 @@ class ShuffleDataset(Dataset):
# Pyfunc collection for multiprocess pyfunc
# This global variable will only be used within subprocesses
_GLOBAL_PYFUNC_LIST = []
_OP_NAME = dict()
_OP_PROCESS = dict()
# Pyfunc worker init function
# Python multiprocessing library forbid sending lambda function through pipe.
@ -2215,6 +2271,9 @@ def _pyfunc_worker_init(pyfunc_list):
# Pyfunc worker execution function
# All exceptions will be raised to main processes
def _pyfunc_worker_exec(index, *args):
"""
Internal function for call certain pyfunc in python process.
"""
try:
return _GLOBAL_PYFUNC_LIST[index](*args)
except KeyboardInterrupt:
@ -3509,6 +3568,7 @@ class SamplerFn:
self.multi_process = multi_process
self.joined = False
self.ppid = os.getpid()
self.pid = []
# Event for end of epoch
if multi_process is True:
self.eof = multiprocessing.Event()
@ -3523,6 +3583,7 @@ class SamplerFn:
# which may cause deadlock. Therefore, the subprocess startup is performed in che initialization phase.
# In this phase, the main process is not locked.
worker.start()
self.pid.append(worker.pid)
else:
worker = _GeneratorWorkerMt(dataset, self.eof)
worker.daemon = True
@ -3847,6 +3908,7 @@ class GeneratorDataset(MappableDataset):
new_op.source_len = self.source_len
new_op.saved_output_types = self.saved_output_types
new_op.saved_output_shapes = self.saved_output_shapes
sample_fn = None
if hasattr(self, "__total_batch__"):
new_op.__total_batch__ = self.__total_batch__
if new_op.sampler is not None and hasattr(self.source, "__getitem__"):
@ -3870,9 +3932,11 @@ class GeneratorDataset(MappableDataset):
new_op.source = (lambda: _py_sampler_fn_mp(new_op.sample_ids, new_op.num_samples, sample_fn))
else:
new_op.source = (lambda: _py_sampler_fn(new_op.sample_ids, new_op.num_samples, self.source))
new_op.sample_fn = sample_fn
else:
try:
new_op.sampler = None
new_op.sample_fn = sample_fn
iter(self.source)
except TypeError:
# Use generator function if input callable