fix connector size order + autotune thread + minor changes

This commit is contained in:
mohammad 2021-11-23 11:46:48 -05:00
parent ec81de6701
commit d6c50e0f26
3 changed files with 31 additions and 27 deletions

View File

@ -64,7 +64,10 @@ Status CallbackManager::EpochBegin(const CallbackParam &cb_param) {
RETURN_OK_IF_TRUE(!enabled_);
RETURN_UNEXPECTED_IF_NULL(op_);
RETURN_IF_NOT_OK(op_->WaitForWorkers());
// only wait if there are callbacks to call
if (epoch_begin_indices_.size() > 0) {
RETURN_IF_NOT_OK(op_->WaitForWorkers());
}
// Now do the actual callback
for (size_t ind : epoch_begin_indices_) {

View File

@ -41,7 +41,11 @@ Status AutoTune::Main() {
MS_LOG(ERROR) << "AutoTune failed and will exit with the following error: " << rc;
break;
}
RETURN_IF_NOT_OK(cv_.WaitFor(&_lock, GlobalContext::config_manager()->autotune_interval()));
rc = cv_.WaitFor(&_lock, GlobalContext::config_manager()->autotune_interval());
// the thread may be interrupted for tree termination when waiting (we should not report error in this case)
if (rc.IsError() && rc != StatusCode::kMDInterrupted) {
return rc;
}
}
RETURN_IF_NOT_OK(profiling_manager_->Stop());
MS_LOG(INFO) << "AutoTune thread is finished.";
@ -60,7 +64,7 @@ Status AutoTune::CollectOpsInfo() {
for (auto itr = tree->begin(); itr != tree->end(); ++itr) {
ops_[itr->id()] = itr.get();
// get all parallel ops (num_workers>0) except leaf nodes (no children)
if (itr->NumWorkers() > 0 && itr->Children().size() > 0) {
if (itr->NumWorkers() > 0) {
parallel_ops_ids_.push_back(itr->id());
}
}
@ -71,8 +75,6 @@ Status AutoTune::CollectOpsInfo() {
if (parallel_ops_ids_.size() != 0) {
CHECK_FAIL_RETURN_UNEXPECTED(parallel_ops_ids_[parallel_ops_ids_.size() - 1] != 0,
"Non-sink pipeline, root node is a ParallelOp. AutoTune is not supported.");
CHECK_FAIL_RETURN_UNEXPECTED(parallel_ops_ids_[0] != leaf_op_id_, "Leaf Operator is added to ParallelOps list.");
}
return Status::OK();
@ -225,20 +227,14 @@ Status AutoTune::Analyse() {
std::map<int32_t, double> ops_cpu_util;
RETURN_IF_NOT_OK(GetOpsCpuUtil(&ops_cpu_util));
// check leaf
if (ops_queue_util[leaf_op_id_] < LEAF_QUEUE_THRESHOLD) {
MS_LOG(WARNING) << "Leaf op (" << ops_[leaf_op_id_]->NameWithID()
<< ") queue utilization: " << ops_queue_util[leaf_op_id_] * TO_PERCENT << "% < "
<< LEAF_QUEUE_THRESHOLD * TO_PERCENT << "% threshold.";
RETURN_IF_NOT_OK(RequestNumWorkerChange(leaf_op_id_, ops_num_workers[leaf_op_id_],
ops_num_workers[leaf_op_id_] + INCREMENT_WORKER));
}
// check parallel ops in loop
for (const auto &op_id : parallel_ops_ids_) {
// op specifics
double output_queue_util = ops_queue_util[op_id];
double input_queue_util = ops_queue_util[op_id + 1];
double input_queue_util = 1; // assume that leaf op has 100% input queue util
if (op_id + 1 < ops_.size()) {
input_queue_util = ops_queue_util[op_id + 1];
}
double cpu_util = ops_cpu_util[op_id];
int32_t num_workers = ops_num_workers[op_id];
CHECK_FAIL_RETURN_UNEXPECTED(num_workers != 0, "ParallelOp with num_workers=0");
@ -252,19 +248,18 @@ Status AutoTune::Analyse() {
MS_LOG(WARNING) << "Op (" << ops_[op_id]->NameWithID()
<< ") is slow, input connector utilization=" << input_queue_util
<< ", output connector utilization=" << output_queue_util << ", diff= " << queue_diff << " > "
<< INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD << "threshold.";
<< INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD << " threshold.";
RETURN_IF_NOT_OK(RequestNumWorkerChange(op_id, num_workers, num_workers + INCREMENT_WORKER));
} else {
if ((cpu_util / num_workers) > MAP_OP_WORKER_HIGH_THRESHOLD) {
MS_LOG(WARNING) << "Op (" << ops_[op_id]->NameWithID() << ") getting high average worker cpu utilization "
<< (cpu_util / num_workers) << "% > " << MAP_OP_WORKER_HIGH_THRESHOLD << "% threshold.";
RETURN_IF_NOT_OK(RequestNumWorkerChange(op_id, num_workers, num_workers + INCREMENT_WORKER));
} else if ((cpu_util / num_workers) < MAP_OP_WORKER_LOW_THRESHOLD &&
((input_queue_util < INPUT_QUEUE_LOW) || (-1 * queue_diff > INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD))) {
MS_LOG(WARNING) << "Op (" << ops_[op_id]->NameWithID() << ") getting low average worker cpu utilization "
<< (cpu_util / num_workers) << "% < " << MAP_OP_WORKER_LOW_THRESHOLD << "% threshold.";
RETURN_IF_NOT_OK(RequestConnectorCapacityChange(op_id, queue_capacity, queue_capacity + INCREMENT_QUEUE_SIZE));
}
} else if ((cpu_util / num_workers) > MAP_OP_WORKER_HIGH_THRESHOLD) {
MS_LOG(WARNING) << "Op (" << ops_[op_id]->NameWithID() << ") getting high average worker cpu utilization "
<< (cpu_util / num_workers) << "% > " << MAP_OP_WORKER_HIGH_THRESHOLD << "% threshold.";
RETURN_IF_NOT_OK(RequestNumWorkerChange(op_id, num_workers, num_workers + INCREMENT_WORKER));
}
if ((cpu_util / num_workers) < MAP_OP_WORKER_LOW_THRESHOLD &&
((input_queue_util < INPUT_QUEUE_LOW) || (-1 * queue_diff > INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD))) {
MS_LOG(WARNING) << "Op (" << ops_[op_id]->NameWithID() << ") getting low average worker cpu utilization "
<< (cpu_util / num_workers) << "% < " << MAP_OP_WORKER_LOW_THRESHOLD << "% threshold.";
RETURN_IF_NOT_OK(RequestConnectorCapacityChange(op_id, queue_capacity, queue_capacity + INCREMENT_QUEUE_SIZE));
}
}
return Status::OK();

View File

@ -35,6 +35,9 @@ Status ConnectorSize::Sample() {
Qrow cur_row;
(void)std::transform(tree_->begin(), tree_->end(), std::back_inserter(cur_row),
[](DatasetOp &op) { return op.ConnectorSize(); });
// Tree Iterator is in PostOrder (leaf first, e.g., 3,2,1)
// reverse the order of the vector to get the root first.
std::reverse(cur_row.begin(), cur_row.end());
std::lock_guard<std::mutex> guard(lock_);
// Push new row of sample
sample_table_.push_back(cur_row);
@ -103,6 +106,9 @@ Status ConnectorSize::Init() {
json json_node = ParseOpInfo(node);
initial_nodes_data["op_info"].push_back(json_node);
}
// Tree Iterator is in PostOrder (leaf first, e.g., 3,2,1)
// reverse the order of the vector to get the root first.
std::reverse(initial_nodes_data["op_info"].begin(), initial_nodes_data["op_info"].end());
return Status::OK();
}