!34458 Fix multiple issues after switch gpu cases

Merge pull request !34458 from ZPaC/adapt-for-ps-test-case
This commit is contained in:
i-robot 2022-05-18 06:51:36 +00:00 committed by Gitee
commit 47942bae80
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
9 changed files with 53 additions and 10 deletions

View File

@ -32,7 +32,7 @@ using ps::core::GeneralResponseMsg;
using ps::core::NodeCommand;
// The timeout in milliseconds for one lookup.
constexpr uint32_t kDefaultLookupTimeout = 300000;
constexpr uint32_t kDefaultLookupTimeout = 60000;
// The time in milliseconds between two lookup operations.
constexpr uint32_t kLookupInterval = 100;

View File

@ -152,6 +152,7 @@ PYBIND11_MODULE(_c_expression, m) {
(void)m.def("real_run_op", &mindspore::pynative::RealRunOp, "Run op pynatively.");
(void)m.def("reset_op_id", &mindspore::pipeline::ResetOpId, "Reset Operator Id");
(void)m.def("reset_op_id_with_offset", &mindspore::pipeline::ResetOpIdWithOffset, "Reset Operator Id With Offset");
(void)m.def("init_hccl", &mindspore::pipeline::InitHccl, "Init Hccl");
(void)m.def("finalize_hccl", &mindspore::pipeline::FinalizeHccl, "Finalize Hccl");
(void)m.def("get_hccl_rank_id", &mindspore::pipeline::GetHcclRankId, "Get Hccl Rank Id");

View File

@ -1444,6 +1444,7 @@ std::string GetJitLevel() {
}
void ResetOpId() { mindspore::id_generator::reset_id(); }
void ResetOpIdWithOffset() { mindspore::id_generator::reset_id_with_offset(); }
void InitHccl() {
auto ms_context = MsContext::GetInstance();
@ -1605,7 +1606,7 @@ void ClearResAtexit() {
RecordExitStatus();
#if ((defined ENABLE_CPU) && (!defined _WIN32) && !defined(__APPLE__))
if (distributed::cluster::ClusterContext::instance()->initialized()) {
(void)distributed::cluster::ClusterContext::instance()->Finalize();
(void)distributed::cluster::ClusterContext::instance()->Finalize(UINT32_MAX);
} else if (ps::PSContext::instance()->is_ps_mode() && ps::PSContext::instance()->is_worker()) {
if (ps::PsDataPrefetch::GetInstance().cache_enable()) {
ps::ps_cache_instance.Finalize();

View File

@ -169,6 +169,7 @@ py::bool_ VerifyInputSignature(const py::list &input_signature, const py::tuple
bool InitDistribute(const std::map<std::string, std::string> &options);
void ResetOpId();
void ResetOpIdWithOffset();
void InitHccl();
void FinalizeHccl();
uint32_t GetHcclRankId();

View File

@ -374,6 +374,7 @@ SeenNum NewSeenGeneration() {
namespace id_generator {
static mindspore::HashMap<std::string, int> node_ids;
static int offset = 0;
std::string get_id(const AnfNodePtr &node) {
auto type_name = node->type_name();
if (node_ids.find(type_name) == node_ids.end()) {
@ -381,10 +382,21 @@ std::string get_id(const AnfNodePtr &node) {
} else {
node_ids[type_name]++;
}
return std::to_string(node_ids[type_name]);
std::string base_id = std::to_string(node_ids[type_name]);
// The id with offset means the user called reset_id_with_offset() and expect the operated id generated from 0 with an
// identified offset.
if (offset != 0) {
return std::to_string(offset) + "_" + base_id;
}
return base_id;
}
void reset_id() { node_ids.clear(); }
void reset_id_with_offset() {
node_ids.clear();
offset++;
}
} // namespace id_generator
auto constexpr kTargetUnDefined = "kTargetUnDefined";
auto constexpr kPrimitiveTarget = "primitive_target";

View File

@ -1161,6 +1161,7 @@ MS_CORE_API SeenNum NewSeenGeneration();
namespace id_generator {
MS_CORE_API std::string get_id(const AnfNodePtr &node);
MS_CORE_API void reset_id();
MS_CORE_API void reset_id_with_offset();
} // namespace id_generator
using TaggedNodeMap = mindspore::HashMap<AnfNodePtr, size_t>;
using TaggedGraph = std::pair<FuncGraphPtr, TaggedNodeMap>;

View File

@ -14,6 +14,7 @@
# ============================================================================
"""comm_helper"""
import os
from mindspore import context
from mindspore.parallel._ps_context import _is_role_pserver, _is_role_sched
from mindspore import log as logger
@ -154,7 +155,6 @@ def _check_mpi_envs():
return True if mpi environment variables have been exported, False otherwise.
"""
import os
ompi_command_env = os.getenv("OMPI_COMMAND")
pmix_rank_env = os.getenv("PMIX_RANK")
if ompi_command_env and pmix_rank_env:
@ -170,8 +170,10 @@ def _not_require_collective_comm_lib():
device_target = context.get_context("device_target")
if device_target == "Ascend":
return _is_role_sched() or _is_role_pserver()
if device_target == "CPU":
return _check_mpi_envs() and (_is_role_sched() or _is_role_pserver())
if device_target == "GPU":
# Environment variable PARALLEL_EXCUTE is set by test case and
# will be removed after Parameter Server training switches to MindRT.
return os.getenv("PARALLEL_EXCUTE") != "ms_ps" and (_is_role_sched() or _is_role_pserver())
return False

View File

@ -16,7 +16,7 @@
import numpy as np
from mindspore import context, log as logger
from mindspore.context import ParallelMode
from mindspore._c_expression import reset_op_id
from mindspore._c_expression import reset_op_id, reset_op_id_with_offset
from mindspore.common.tensor import Tensor
from mindspore.common.dtype import dtype_to_nptype
from mindspore.common import dtype as mstype
@ -273,6 +273,11 @@ def _reset_op_id():
reset_op_id()
def _reset_op_id_with_offset():
"""Reset op id with offset."""
reset_op_id_with_offset()
def _parallel_predict_check():
"""validate parallel model prediction"""
if _get_parallel_mode() in (ParallelMode.SEMI_AUTO_PARALLEL, ParallelMode.AUTO_PARALLEL):

View File

@ -30,7 +30,8 @@ from .._checkparam import check_input_data, check_output_data, Validator
from .callback import _InternalCallbackParam, RunContext, _CallbackManager, Callback
from .. import context
from ..parallel._utils import _get_parallel_mode, _get_device_num, _get_global_rank, \
_get_parameter_broadcast, _device_number_check, _parameter_broadcast_check, _parallel_predict_check
_get_parameter_broadcast, _device_number_check, _parameter_broadcast_check, _parallel_predict_check, \
_reset_op_id_with_offset
from ..parallel._ps_context import _is_role_worker, _is_role_pserver, _is_role_sched, _is_ps_mode, _cache_enable, \
_enable_distributed_mindrt
from ..nn.metrics import Loss
@ -918,6 +919,11 @@ class Model:
sink_size=sink_size,
initial_epoch=initial_epoch)
# When it's Parameter Server training and using MindRT,
# the node id should be reset to start from 0.
if _is_ps_mode() and _enable_distributed_mindrt():
_reset_op_id_with_offset()
def build(self, train_dataset=None, valid_dataset=None, sink_size=-1, epoch=1, jit_config=None):
"""
Build computational graphs and data graphs with the sink mode.
@ -1100,8 +1106,16 @@ class Model:
with _CallbackManager(callbacks) as list_callback:
if dataset_sink_mode:
return self._eval_dataset_sink_process(valid_dataset, list_callback, cb_params)
return self._eval_process(valid_dataset, list_callback, cb_params)
eval_result = self._eval_dataset_sink_process(valid_dataset, list_callback, cb_params)
else:
eval_result = self._eval_process(valid_dataset, list_callback, cb_params)
# When it's Parameter Server training and using MindRT,
# the node id should be reset to start from 0.
if _is_ps_mode() and _enable_distributed_mindrt():
_reset_op_id_with_offset()
return eval_result
def predict(self, *predict_data):
"""
@ -1129,6 +1143,12 @@ class Model:
result = self._predict_network(*predict_data)
check_output_data(result)
# When it's Parameter Server training and using MindRT,
# the node id should be reset to start from 0.
if _is_ps_mode() and _enable_distributed_mindrt():
_reset_op_id_with_offset()
return result
def _infer_train_check(self, train_dataset, dataset_sink_mode, sink_size):