!8275 GPU add loopsink

From: @VectorSL
Reviewed-by: 
Signed-off-by:
This commit is contained in:
mindspore-ci-bot 2020-11-13 09:25:16 +08:00 committed by Gitee
commit 7a3a4d3ad4
5 changed files with 46 additions and 10 deletions

View File

@ -347,7 +347,12 @@ void GPUSession::RunGraphImpl(const GraphId &graph_id, const std::vector<tensor:
InitPSParamAndOptim(kernel_graph, inputs);
#endif
MS_EXCEPTION_IF_NULL(kernel_graph);
Execute(kernel_graph);
// It's InitDataset graph if kernel_num == 1, skip the loop.
int kernel_num = kernel_graph->execution_order().size();
int64_t loopsize = (kernel_num > 1) ? ConfigManager::GetInstance().gpu_loopsink_size() : 1;
for (int64_t i = 0; i < loopsize; i++) {
Execute(kernel_graph);
}
PostLoadTensor(kernel_graph);
// Summary
auto context_ptr = MsContext::GetInstance();

View File

@ -536,7 +536,6 @@ static std::string PrintArgs(const py::tuple &args) {
bool ExecutorPy::Compile(const py::object &obj, const py::tuple &args, const py::object &phase, bool use_vm) {
bool ret_value = false;
try {
MS_LOG(DEBUG) << PrintArgs(args);
ret_value = CompileInner(obj, args, phase, use_vm);
@ -577,7 +576,6 @@ bool ExecutorPy::Compile(const py::object &obj, const py::tuple &args, const py:
std::string exName(abi::__cxa_current_exception_type()->name());
MS_LOG(EXCEPTION) << "Error occurred when compile graph. Exception name: " << exName;
}
return ret_value;
}
@ -658,6 +656,17 @@ void Pipeline::Run() {
#endif
MS_LOG(DEBUG) << "Action " << action.first << " end.";
};
if (action.first == "task_emit") {
auto func_graph = resource_->func_graph();
if (func_graph != nullptr && func_graph->manager() != nullptr) {
auto manager = func_graph->manager();
size_t graph_nums = manager->func_graphs().size();
if (graph_nums == 1) {
resource_->set_gpu_loopsink_flag(true);
MS_LOG(INFO) << "Change gpu_loopsink_flag_ to true.";
}
}
}
if (!result) {
MS_LOG(EXCEPTION) << "Pipeline running to end, failed in step:" << action.first;
}
@ -823,10 +832,22 @@ py::object ExecutorPy::Run(const py::tuple &args, const py::object &phase) {
if (run == nullptr) {
MS_LOG(EXCEPTION) << "Can't find run graph func for " << phase_s;
}
// Set loopsink size for each phase.
bool is_loopsink = info_[phase_s]->resource->gpu_loopsink_flag();
int64_t sinksize = ConfigManager::GetInstance().iter_num();
ConfigManager::GetInstance().set_gpu_loopsink_size(is_loopsink ? sinksize : 1);
// If target is not gpu or is loopsink, keep vmloop 1.
bool g = (MsContext::GetInstance()->get_param<std::string>(MS_CTX_DEVICE_TARGET) == kGPUDevice);
int64_t vm_loop = (!g || is_loopsink) ? 1 : sinksize;
MS_LOG(INFO) << "VM loop size " << vm_loop << ", loopsink size " << (is_loopsink ? sinksize : 1);
py::object ret;
MS_LOG(DEBUG) << "Eval run" << backend;
BaseRef value = (*run)(execute_info->arg_list);
for (int64_t i = 0; i < vm_loop; i++) {
BaseRef value = (*run)(execute_info->arg_list);
ret = BaseRefToPyData(value);
}
MS_LOG(DEBUG) << "Run end";
return BaseRefToPyData(value);
return ret;
}
FuncGraphPtr ExecutorPy::BuildGraph(const py::dict &init_params, const std::string &phase,

View File

@ -74,6 +74,9 @@ class Resource : public ResourceBase {
const abstract::AbstractBasePtrList &args_spec() const { return args_spec_; }
void set_args_spec(const abstract::AbstractBasePtrList &args_spec) { args_spec_ = args_spec; }
void set_gpu_loopsink_flag(const bool &flag) { gpu_loopsink_flag_ = flag; }
bool gpu_loopsink_flag() { return gpu_loopsink_flag_; }
// Reclaim resource and clear the cache.
// ExecutorPy::Compile() can be called multiple times, so cache
// should be cleared.
@ -85,6 +88,7 @@ class Resource : public ResourceBase {
abstract::AbstractBasePtrList args_spec_;
py::object input_;
bool is_cleaned_;
bool gpu_loopsink_flag_{false};
};
using ResourcePtr = std::shared_ptr<pipeline::Resource>;

View File

@ -106,6 +106,10 @@ class ConfigManager {
std::map<std::string, std::string> ge_initialize_options_;
int64_t gpu_loopsink_size() const { return gpu_loopsink_size_; }
void set_gpu_loopsink_size(const int64_t size) { gpu_loopsink_size_ = size; }
private:
ConfigManager() = default;
~ConfigManager() = default;
@ -115,6 +119,7 @@ class ConfigManager {
DatasetGraphParam dataset_param_{"", 0, 0, {}, {}, {}};
int64_t iter_num_{1};
std::string dataset_phase_{""};
int64_t gpu_loopsink_size_{1};
};
} // namespace mindspore

View File

@ -45,7 +45,7 @@ def connect_network_with_dataset(network, dataset_helper):
data channel corresponding to the 'queue_name' and passed to the input network during forward computation.
Note:
In the case of running the network on Ascend in graph mode, this function will wrap the input network with
In the case of running the network on Ascend/GPU in graph mode, this function will wrap the input network with
'GetNext', in other cases, the input network will be returned with no change.
The 'GetNext' is required to get data only in sink mode, so this function is not applicable to no-sink mode.
@ -88,8 +88,8 @@ def connect_network_with_dataset(network, dataset_helper):
if isinstance(dataset_iter, _DatasetIterNormal):
raise RuntimeError("Dataset should be connected with network only in sink mode.")
if not hasattr(dataset, '__ME_INITED__') and context.get_context("device_target") == "Ascend" and \
not context.get_context("enable_ge"):
if not hasattr(dataset, '__ME_INITED__') and (context.get_context("device_target") == "Ascend" \
or context.get_context("device_target") == "GPU") and not context.get_context("enable_ge"):
dataset.__ME_INITED__ = True
dataset_types, dataset_shapes = dataset_helper.types_shapes()
queue_name = dataset.__TRANSFER_DATASET__.queue_name
@ -139,7 +139,7 @@ class DatasetHelper:
if ms_role in ("MS_PSERVER", "MS_SCHED"):
iterclass = _DatasetIterPSLite
else:
iterclass = _DatasetIterMS
iterclass = _DatasetIterMSLoopSink
elif context.get_context("device_target") == "CPU":
raise RuntimeError("Currently dataset sink mode is not supported when the device target is CPU.")
self.iter = iterclass(dataset, sink_size, epoch_num)
@ -218,7 +218,8 @@ class _DatasetIter:
if hasattr(self.dataset, '__loop_size__'):
sink_size = self.dataset.__loop_size__
else:
if context.get_context("enable_ge") or context.get_context("device_target") == "Ascend":
if context.get_context("enable_ge") or context.get_context("device_target") == "Ascend" \
or context.get_context("device_target") == "GPU":
if self.sink_size > 0:
sink_size = self.sink_size
else: