gpu minddata 0921
This commit is contained in:
parent
a794cb1b82
commit
ad44ec72e8
|
@ -121,8 +121,7 @@ if(ENABLE_TESTCASES OR (NOT ENABLE_D))
|
|||
add_compile_definitions(NO_DLIB=1)
|
||||
endif()
|
||||
|
||||
if(NOT (ENABLE_TESTCASES OR ENABLE_TEST) AND NOT (CMAKE_SYSTEM_NAME MATCHES "Windows" OR
|
||||
CMAKE_SYSTEM_NAME MATCHES "Darwin") AND (ENABLE_D OR ENABLE_GPU OR ENABLE_CPU))
|
||||
if(NOT (ENABLE_TESTCASES OR ENABLE_TEST) AND (ENABLE_D OR ENABLE_GPU OR ENABLE_CPU))
|
||||
add_compile_definitions(WITH_BACKEND)
|
||||
endif()
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@
|
|||
#include "utils/trace_base.h"
|
||||
#include "include/common/utils/parallel_context.h"
|
||||
#include "kernel/oplib/oplib.h"
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#include "ps/ps_cache/ps_data/ps_data_prefetch.h"
|
||||
#include "ps/constants.h"
|
||||
#include "ps/util.h"
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
namespace mindspore {
|
||||
namespace memreuse {
|
||||
constexpr auto kSplitC = '/';
|
||||
class MemReuseChecker {
|
||||
class BACKEND_EXPORT MemReuseChecker {
|
||||
public:
|
||||
static MemReuseChecker &GetInstance();
|
||||
MemReuseChecker(const MemReuseChecker &) = delete;
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
#define MINDSPORE_CCSRC_DISTRIBUTED_INIT_H_
|
||||
|
||||
#include "distributed/collective/collective_manager.h"
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#include "distributed/cluster/cluster_context.h"
|
||||
#else
|
||||
#include "distributed/cluster/dummy_cluster_context.h"
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
#include "include/common/utils/anfalgo.h"
|
||||
#include "include/common/debug/draw.h"
|
||||
#include "include/common/utils/parallel_context.h"
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#include "ps/ps_context.h"
|
||||
#endif
|
||||
|
||||
|
@ -336,7 +336,7 @@ distributed::DistExecutionMode GenerateStrategy() {
|
|||
distributed::DistExecutionMode strategy;
|
||||
bool enable_ps = false;
|
||||
bool enable_embedding_cache = false;
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
enable_ps = ps::PSContext::instance()->is_ps_mode();
|
||||
enable_embedding_cache = ps::PSContext::instance()->cache_enable();
|
||||
#endif
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
#include "include/common/utils/utils.h"
|
||||
#include "ir/func_graph.h"
|
||||
#include "distributed/constants.h"
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#include "distributed/cluster/cluster_context.h"
|
||||
#else
|
||||
#include "distributed/cluster/dummy_cluster_context.h"
|
||||
|
|
|
@ -26,9 +26,8 @@
|
|||
#include "frontend/parallel/dynamic_creator.h"
|
||||
#include "frontend/parallel/graph_util/generate_graph.h"
|
||||
#include "include/common/utils/parallel_context.h"
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#include "ps/ps_cache/ps_data/ps_data_prefetch.h"
|
||||
#include "utils/ms_context.h"
|
||||
#include "ps/ps_context.h"
|
||||
#include "distributed/embedding_cache/embedding_cache_utils.h"
|
||||
#endif
|
||||
|
@ -163,7 +162,7 @@ Status GatherInfo::GetAttrs() {
|
|||
if (std::find(inputs_shape_[1].begin(), inputs_shape_[1].end(), -1) != inputs_shape_[1].end()) {
|
||||
dynamic_shape_indices_ = true;
|
||||
}
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
if (ps::PsDataPrefetch::GetInstance().cache_enable()) {
|
||||
dynamic_shape_indices_ = true;
|
||||
}
|
||||
|
@ -757,7 +756,7 @@ Status GatherInfo::InferBias() {
|
|||
rank = rank % (params_strategy[0] * params_strategy[1]);
|
||||
}
|
||||
}
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
if (ps::PsDataPrefetch::GetInstance().cache_enable()) {
|
||||
if (ps::PSContext::instance()->enable_distributed_mindrt()) {
|
||||
bias_ = static_cast<int64_t>(embedding_cache_table_manager.cache_indices_lower_bound());
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
#include "frontend/parallel/strategy.h"
|
||||
#include "include/common/utils/parallel_context.h"
|
||||
#include "frontend/parallel/tensor_layout/tensor_redistribution.h"
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#include "ps/ps_cache/ps_data/ps_data_prefetch.h"
|
||||
#include "ps/ps_context.h"
|
||||
#include "distributed/embedding_cache/embedding_cache_utils.h"
|
||||
|
@ -102,7 +102,7 @@ std::vector<StrategyPtr> UniqueInfo::GenerateOpStrategies(int64_t stage_id) {
|
|||
|
||||
return sp_vector;
|
||||
}
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
Status UniqueInfo::ComputeReplaceGraph(const CNodePtr &cnode) {
|
||||
GenerateGraph gen_g = GenerateGraph(attrs_);
|
||||
if (gen_g.Init(cnode) != SUCCESS) {
|
||||
|
@ -142,7 +142,7 @@ Status UniqueInfo::ComputeReplaceGraph(const CNodePtr &cnode) {
|
|||
}
|
||||
#endif
|
||||
ReplaceGraphPtr UniqueInfo::replace_graph(const CNodePtr &cnode) {
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
if (ps::PsDataPrefetch::GetInstance().cache_enable()) {
|
||||
auto inputs = cnode->inputs();
|
||||
if (inputs.empty()) {
|
||||
|
|
|
@ -48,7 +48,7 @@
|
|||
#include "ir/anf.h"
|
||||
#include "ir/param_info.h"
|
||||
#include "ir/tensor.h"
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#include "ps/util.h"
|
||||
#endif
|
||||
|
||||
|
@ -71,7 +71,7 @@ void SearchParallelStrategy(const std::string &strategy_search_mode, const FuncG
|
|||
}
|
||||
|
||||
bool StepAutoParallel(const FuncGraphPtr &root, const opt::OptimizerPtr &) {
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
if (ps::Util::IsRoleOfPServer() || ps::Util::IsRoleOfScheduler()) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "minddata/dataset/engine/datasetops/data_queue_op.h"
|
||||
#ifdef WITH_BACKEND
|
||||
#include "mindspore/core/utils/numa_interface.h"
|
||||
#include "utils/ms_context.h"
|
||||
#endif
|
||||
#include "minddata/dataset/util/task_manager.h"
|
||||
#include "minddata/dataset/util/service.h"
|
||||
|
|
|
@ -53,7 +53,7 @@
|
|||
#include "backend/graph_compiler/transform.h"
|
||||
#include "load_mindir/infer_mindir.h"
|
||||
#include "debug/data_dump/dump_json_parser.h"
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#include "ps/scheduler.h"
|
||||
#include "distributed/cluster/cluster_context.h"
|
||||
#endif
|
||||
|
@ -127,7 +127,7 @@ void DisableMindRT(const ResourcePtr &resource) {
|
|||
if (context_ptr->get_param<bool>(MS_CTX_ENABLE_MINDRT) == false) {
|
||||
return;
|
||||
}
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
if (ps::PSContext::instance()->cache_enable()) {
|
||||
return;
|
||||
}
|
||||
|
@ -863,7 +863,7 @@ bool OptInlineAction(const ResourcePtr &resource) {
|
|||
bool GeOptimizeAction(const ResourcePtr &resource) { return OptimizeAction(resource, kGePasses); }
|
||||
|
||||
bool VmOptimizeAction(const ResourcePtr &resource) {
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
if (ps::PSContext::instance()->is_ps_mode()) {
|
||||
(void)kVmPasses.emplace_back(PassItem("server_communication_op_fusion", ps::Util::FuseServerCommOps));
|
||||
}
|
||||
|
@ -1121,7 +1121,7 @@ void SetRunMode(const FuncGraphPtr &func_graph, compile::Backend *backend_ptr) {
|
|||
return;
|
||||
}
|
||||
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
if (ps::PSContext::instance()->cache_enable()) {
|
||||
set_ctx(true, false, false);
|
||||
return;
|
||||
|
@ -1182,7 +1182,7 @@ void SetRunMode(const ResourcePtr &resource) {
|
|||
auto enable_hccl = context_ptr->get_param<bool>(MS_CTX_ENABLE_HCCL);
|
||||
// After the distributed interface on D is unified, the following flag and judgment will be removed.
|
||||
bool enbale_distributed_mindrt = false;
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
enbale_distributed_mindrt = ps::PSContext::instance()->enable_distributed_mindrt();
|
||||
#endif
|
||||
if (!is_task_sink && mode == kGraphMode && enable_hccl && !common::UseMPI() && !enbale_distributed_mindrt) {
|
||||
|
@ -1278,7 +1278,7 @@ bool ExecuteAction(const ResourcePtr &resource) {
|
|||
return true;
|
||||
}
|
||||
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
bool StartPSSchedulerAction(const ResourcePtr &) {
|
||||
if (distributed::cluster::ClusterContext::instance()->initialized()) {
|
||||
MS_LOG(INFO) << "This node is scheduler. Start wait for finalizing.";
|
||||
|
@ -1574,7 +1574,7 @@ std::vector<ActionItem> VmPipeline(const ResourcePtr &resource) {
|
|||
(void)actions.emplace_back(std::make_pair("validate", ValidateAction));
|
||||
}
|
||||
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
(void)actions.emplace_back(std::make_pair("distribtued_split", DistributedSplitAction));
|
||||
if (ps::PSContext::instance()->is_worker()) {
|
||||
if (distributed::cluster::ClusterContext::instance()->initialized()) {
|
||||
|
@ -1611,7 +1611,7 @@ std::vector<ActionItem> MindIRPipeline() {
|
|||
return actions;
|
||||
}
|
||||
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
std::vector<ActionItem> PSchedulerPipeline(const ResourcePtr &resource) {
|
||||
if (resource->EnableCompileCache() && resource->func_graph() != nullptr) {
|
||||
return {std::make_pair("scheduler", StartPSSchedulerAction)};
|
||||
|
|
|
@ -37,13 +37,17 @@ bool GeOptimizeAction(const ResourcePtr &resource);
|
|||
bool VmOptimizeAction(const ResourcePtr &resource);
|
||||
bool TaskEmitAction(const ResourcePtr &resource);
|
||||
bool ExecuteAction(const ResourcePtr &resource);
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
bool StartPSSchedulerAction(const ResourcePtr &resource);
|
||||
bool DistributedSplitAction(const ResourcePtr &resource);
|
||||
#endif
|
||||
|
||||
std::vector<ActionItem> GePipeline();
|
||||
std::vector<ActionItem> VmPipeline(const ResourcePtr &resource);
|
||||
std::vector<ActionItem> MindIRPipeline();
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
std::vector<ActionItem> PSchedulerPipeline(const ResourcePtr &resource);
|
||||
#endif
|
||||
abstract::AnalysisResult AbstractAnalyze(const ResourcePtr &resource, const FuncGraphPtr &func_graph,
|
||||
const abstract::AbstractBasePtrList &args_abs, bool clear = false);
|
||||
FuncGraphPtr ProgramSpecialize(const ResourcePtr &resource, const FuncGraphPtr &func_graph,
|
||||
|
|
|
@ -30,7 +30,7 @@
|
|||
#include "frontend/parallel/step_parallel.h"
|
||||
#include "mindspore/core/utils/file_utils.h"
|
||||
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#include "ps/core/node.h"
|
||||
#include "distributed/cluster/cluster_context.h"
|
||||
#endif
|
||||
|
@ -69,7 +69,7 @@ std::string GetCompileCacheDir() {
|
|||
}
|
||||
|
||||
std::string GetRole() {
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
if (distributed::cluster::ClusterContext::instance()->initialized()) {
|
||||
auto node = distributed::cluster::ClusterContext::instance()->node();
|
||||
MS_EXCEPTION_IF_NULL(node);
|
||||
|
|
|
@ -67,7 +67,7 @@
|
|||
#include "runtime/device/stream_synchronizer.h"
|
||||
#include "distributed/collective/collective_manager.h"
|
||||
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#include "ps/constants.h"
|
||||
#include "ps/util.h"
|
||||
#include "ps/ps_cache/ps_data/ps_data_prefetch.h"
|
||||
|
@ -748,7 +748,7 @@ std::vector<ActionItem> GetPipeline(const ResourcePtr &resource, const std::stri
|
|||
auto ms_context = MsContext::GetInstance();
|
||||
MS_EXCEPTION_IF_NULL(ms_context);
|
||||
std::string backend = ms_context->backend_policy();
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
if (distributed::cluster::ClusterContext::instance()->initialized()) {
|
||||
auto node = distributed::cluster::ClusterContext::instance()->node();
|
||||
MS_EXCEPTION_IF_NULL(node);
|
||||
|
@ -1448,7 +1448,7 @@ bool InitExecDataset(const std::string &queue_name, int64_t iter_num, int64_t ba
|
|||
bool InitExecDatasetVm(const std::string &queue_name, int64_t size, int64_t batch_size,
|
||||
const std::vector<TypePtr> &types, const std::vector<std::vector<int64_t>> &shapes,
|
||||
const std::vector<int64_t> &input_indexes, bool need_run) {
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
if ((ps::PSContext::instance()->is_ps_mode()) && (!ps::PSContext::instance()->is_worker())) {
|
||||
return true;
|
||||
}
|
||||
|
@ -1503,7 +1503,7 @@ bool InitExecDatasetVm(const std::string &queue_name, int64_t size, int64_t batc
|
|||
MS_EXCEPTION_IF_NULL(backend);
|
||||
// The data set graph compiling and running of mindRT.
|
||||
if (MsContext::GetInstance()->get_param<bool>(MS_CTX_ENABLE_MINDRT)) {
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
if (ps::PSContext::instance()->is_worker() && ps::PSContext::instance()->cache_enable()) {
|
||||
ps::PsDataPrefetch::GetInstance().CreateDataChannel(queue_name, LongToSize(size));
|
||||
}
|
||||
|
@ -1529,7 +1529,7 @@ bool InitExecDatasetVm(const std::string &queue_name, int64_t size, int64_t batc
|
|||
auto runner = convert_fn(segment, "");
|
||||
ConfigManager::GetInstance().set_iter_num(queue_name, size);
|
||||
// PS cache does not support loop sink.
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
if (ps::PSContext::instance()->is_worker() && ps::PsDataPrefetch::GetInstance().cache_enable()) {
|
||||
ps::PsDataPrefetch::GetInstance().CreateDataChannel(queue_name, LongToSize(size));
|
||||
ConfigManager::GetInstance().set_iter_num(queue_name, 1);
|
||||
|
@ -1898,7 +1898,7 @@ bool PyIsCipherFile(const std::string &file_path) { return mindspore::IsCipherFi
|
|||
|
||||
void FinalizeCluster() {
|
||||
MS_LOG(INFO) << "Start finalize the cluster instance.";
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
if (distributed::cluster::ClusterContext::instance()->initialized()) {
|
||||
runtime::EmbeddingCacheScheduler::GetInstance().Finalize();
|
||||
(void)distributed::cluster::ClusterContext::instance()->Finalize(UINT32_MAX);
|
||||
|
|
|
@ -47,7 +47,7 @@
|
|||
#include "backend/common/session/anf_runtime_algorithm.h"
|
||||
#include "include/common/utils/anfalgo.h"
|
||||
#include "plugin/device/cpu/hal/profiler/cpu_profiling.h"
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#include "plugin/device/cpu/hal/hardware/ms_collective_comm_lib.h"
|
||||
#endif
|
||||
#ifndef ENABLE_SECURITY
|
||||
|
@ -407,7 +407,7 @@ bool CPUDeviceResManager::LoadCollectiveCommLib() {
|
|||
collective_comm_lib_ = instance_func();
|
||||
MS_EXCEPTION_IF_NULL(collective_comm_lib_);
|
||||
} else {
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
collective_comm_lib_ = &MsCollectiveCommLib::GetInstance();
|
||||
MS_EXCEPTION_IF_NULL(collective_comm_lib_);
|
||||
#endif
|
||||
|
|
|
@ -20,13 +20,13 @@
|
|||
#include <functional>
|
||||
#include <memory>
|
||||
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#include "plugin/device/cpu/hal/hardware/ms_collective_comm_lib.h"
|
||||
#endif
|
||||
|
||||
namespace mindspore {
|
||||
namespace kernel {
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
using device::CollectiveOpReduceType::Reduce_Sum;
|
||||
using device::cpu::kMCCLGlobalGroupName;
|
||||
using device::cpu::MsCollectiveCommLib;
|
||||
|
@ -37,7 +37,7 @@ constexpr char kSupportedReduceOp[] = "sum";
|
|||
} // namespace
|
||||
|
||||
void AllReduceCPUKernelMod::InitKernel(const CNodePtr &kernel_node) {
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
MS_EXCEPTION_IF_NULL(kernel_node);
|
||||
kernel_name_ = common::AnfAlgo::GetCNodeName(kernel_node);
|
||||
auto kernel_attr = GetKernelAttrFromNode(kernel_node);
|
||||
|
@ -67,7 +67,7 @@ std::vector<KernelAttr> AllReduceCPUKernelMod::GetOpSupport() {
|
|||
bool AllReduceCPUKernelMod::Launch(const std::vector<kernel::AddressPtr> &inputs,
|
||||
const std::vector<kernel::AddressPtr> &,
|
||||
const std::vector<kernel::AddressPtr> &outputs) {
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
if (inputs.empty() || outputs.empty()) {
|
||||
MS_LOG(EXCEPTION) << kernel_name_ << " has at least one input and one output, but got 0.";
|
||||
}
|
||||
|
|
|
@ -50,6 +50,10 @@ DataQueueStatus GpuDataQueueDynamic::Push(std::vector<DataQueueItem> data) {
|
|||
return DataQueueStatus::SUCCESS;
|
||||
}
|
||||
|
||||
if (IsFull()) {
|
||||
return DataQueueStatus::TIMEOUT;
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < data.size(); i++) {
|
||||
auto &item = data[i];
|
||||
if (item.data_ptr == nullptr) {
|
||||
|
@ -196,6 +200,9 @@ DataQueueStatus GpuDataQueue::Push(std::vector<DataQueueItem> data) {
|
|||
if (data.empty()) {
|
||||
return DataQueueStatus::SUCCESS;
|
||||
}
|
||||
if (IsFull()) {
|
||||
return DataQueueStatus::TIMEOUT;
|
||||
}
|
||||
size_t data_len =
|
||||
std::accumulate(data.begin(), data.end(), 0, [](size_t accum, const auto &it) { return accum + it.data_len; });
|
||||
void *device_addr = nullptr;
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
#ifndef MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_SET_H_
|
||||
#define MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_SET_H_
|
||||
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#define ENABLE_RPC_ACTOR
|
||||
#endif
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
#include "utils/log_adapter.h"
|
||||
#include "include/common/utils/convert_utils.h"
|
||||
#include "distributed/recovery/recovery_context.h"
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#include "runtime/graph_scheduler/embedding_cache_scheduler.h"
|
||||
#endif
|
||||
|
||||
|
@ -926,7 +926,7 @@ void DataPrepareActor::PreprocessBeforePrepareData() const {
|
|||
// Embedding Cache mode needs to record the number of global steps executed by the compute graph.
|
||||
// The first step compute graph needs to wait for the Embedding cache prefetch cache to warm up to prevent the
|
||||
// GetNext operator from timing out in the compute graph.
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
EmbeddingCacheScheduler::GetInstance().IncreaseGraphStep(GetAID());
|
||||
#endif
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
#include "runtime/device/stream_synchronizer.h"
|
||||
#include "distributed/recovery/recovery_context.h"
|
||||
#include "distributed/collective/collective_manager.h"
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#include "runtime/graph_scheduler/rpc_node_scheduler.h"
|
||||
#endif
|
||||
|
||||
|
@ -107,7 +107,7 @@ void LoopCountActor::SendOutput(OpContext<DeviceTensor> *const context) {
|
|||
ActorDispatcher::Send(entrance_aid, &EntranceActor::ClearDataOnStepEnd, from_aid, context);
|
||||
}
|
||||
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
// Update rpc actors' status.
|
||||
RpcActorStatusUpdater::GetInstance().UpdateRpcActorStatus();
|
||||
#endif
|
||||
|
|
|
@ -44,7 +44,7 @@
|
|||
#ifndef ENABLE_SECURITY
|
||||
#include "debug/data_dump/dump_json_parser.h"
|
||||
#endif
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#include "ps/ps_context.h"
|
||||
#include "runtime/graph_scheduler/embedding_cache_scheduler.h"
|
||||
#endif
|
||||
|
@ -374,7 +374,7 @@ GraphId GraphCompiler::CompileWholeGraphForGraphRunMode(const FuncGraphPtr &func
|
|||
// set executing sink true in graph mode
|
||||
root_graph->set_run_mode(device::RunMode::kGraphMode);
|
||||
root_graph->set_is_loop_count_sink(true);
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
// Embedding cache need global step of compute graph, can not enable loop sink, move loop control to loop count actor.
|
||||
if (ps::PSContext::instance()->cache_enable()) {
|
||||
root_graph->set_is_loop_count_sink(false);
|
||||
|
@ -473,7 +473,7 @@ GraphId GraphCompiler::CompileGraphImpl(const KernelGraphPtr &graph, const Devic
|
|||
DumpJsonParser::GetInstance().UpdateNeedDumpKernels(*graph.get());
|
||||
#endif
|
||||
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
// Set device address for embedding cache parameter, only enable when enable embedding cache mode.
|
||||
EmbeddingCacheScheduler::GetInstance().SetEmbedCachedParamAddress(device_context, graph);
|
||||
#endif
|
||||
|
|
|
@ -54,7 +54,7 @@
|
|||
#include "include/common/debug/common.h"
|
||||
#include "distributed/recovery/recovery_context.h"
|
||||
#include "distributed/collective/collective_manager.h"
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
#include "distributed/cluster/cluster_context.h"
|
||||
#else
|
||||
#include "distributed/cluster/dummy_cluster_context.h"
|
||||
|
@ -195,7 +195,7 @@ void IntHandler(int, siginfo_t *, void *) {
|
|||
}
|
||||
#endif
|
||||
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
void SendFinishTransform(const std::string &actor_set_name) {
|
||||
auto node = ClusterContext::instance()->node();
|
||||
MS_EXCEPTION_IF_NULL(node);
|
||||
|
@ -517,13 +517,13 @@ ActorSet *GraphScheduler::Transform(const GraphCompilerInfo &graph_compiler_info
|
|||
Optimize(actor_set);
|
||||
MS_LOG(INFO) << "Graph(" << graph_compiler_info.name_ << ") transforms actor end.";
|
||||
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
if (ClusterContext::instance()->initialized() && RecoveryContext::GetInstance()->enable_recovery()) {
|
||||
SendFinishTransform(graph_compiler_info.name_);
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
// Save data channel for this actor set.
|
||||
MS_EXCEPTION_IF_NULL(actor_set->data_prepare_actor_);
|
||||
EmbeddingCacheScheduler::GetInstance().SetDataSetChannel(actor_set->data_prepare_actor_->GetAID(),
|
||||
|
@ -637,7 +637,7 @@ void GraphScheduler::Run(ActorSet *const actor_set, const std::vector<std::vecto
|
|||
const size_t kSecondsToMilliseconds = 1000;
|
||||
SetActorExecutionStrategy(actor_set, strategy, (end_time - start_time) * kSecondsToMilliseconds);
|
||||
|
||||
#ifdef WITH_BACKEND
|
||||
#if defined(__linux__) && defined(WITH_BACKEND)
|
||||
DoDisasterRecovery(actor_set->name_);
|
||||
#endif
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue