auto parallel adasum uts and checks

This commit is contained in:
yao_yf 2022-02-17 19:33:13 +08:00
parent 59b30ef4a6
commit 4b79d4c425
3 changed files with 209 additions and 4 deletions

View File

@ -877,6 +877,34 @@ void HandleAdaSumSqueeze(const AnfNodePtr &stridedslice_node1, const std::vector
}
}
void HandleAdaSumPureModelParallel(const AnfNodePtr &node) {
if (!IsPrimitiveCNode(node, prim::kPrimSend) && !IsPrimitiveCNode(node, prim::kPrimReceive)) {
return;
}
PrimitivePtr send_rec_prim = GetCNodePrimitive(node);
int64_t origin_dest_rank = GetValue<int64_t>(send_rec_prim->GetAttr("opposite_rank"));
int64_t rank = g_device_manager->global_rank();
CNodePtr cnode = node->cast<CNodePtr>();
auto pre_cnode = RealInputNode(cnode, 1);
int64_t rank_dis = abs(origin_dest_rank - rank);
if (rank_dis == ADASUM_MIN_DIS && IsPrimitiveCNode(pre_cnode, prim::kPrimStridedSlice)) {
auto squeeze_node = pre_cnode->cast<CNodePtr>()->input(1);
if (!IsPrimitiveCNode(squeeze_node, prim::kPrimSqueeze)) {
return;
}
auto squeeze_input = squeeze_node->cast<CNodePtr>()->input(1);
auto manager = squeeze_node->func_graph()->manager();
AnfNodeIndexSet squeeze_input_node_user_set = manager->node_users()[squeeze_input];
for (auto &squeeze_input_user : squeeze_input_node_user_set) {
if (IsPrimitiveCNode(squeeze_input_user.first, prim::kPrimSqueeze) ||
IsPrimitiveCNode(squeeze_input_user.first, prim::kPrimUpdateState)) {
continue;
}
manager->Replace(squeeze_input_user.first, squeeze_input);
}
}
}
bool HandleAdaSum(const FuncGraphPtr &root, const std::vector<AnfNodePtr> &all_nodes,
std::unordered_map<std::string, std::shared_ptr<TensorLayout>> *adasum_param_tensor_layout_map) {
std::unordered_map<std::string, CNodePtr> forward_origin_first_node_map;
@ -901,6 +929,12 @@ bool HandleAdaSum(const FuncGraphPtr &root, const std::vector<AnfNodePtr> &all_n
target_param = GetValue<std::string>(prim->GetAttr("target_param"));
auto target_param_layout = (*adasum_param_tensor_layout_map)[target_param];
RankList group_devices = GetRankListByLayout(target_param_layout);
// only model parallel
if (group_devices.size() == 1) {
HandleAdaSumPureModelParallel(node);
continue;
}
int64_t adasum_rank_distance = (group_devices.back() - group_devices.front()) / (group_devices.size() - 1);
// when the repeat dim is right, the parameter do not enable adasum.
if (adasum_rank_distance == 1 && group_devices.size() < size_t(g_device_manager->stage_device_num())) {

View File

@ -16,6 +16,10 @@
import copy
import hashlib
import math
import mindspore.nn as nn
import mindspore.log as logger
from mindspore import context
from mindspore._checkparam import Validator as validator
from mindspore.nn.cell import Cell
from mindspore.common.parameter import ParameterTuple, Parameter
from mindspore.parallel._utils import _get_global_rank, _get_stage_device_num
@ -353,25 +357,47 @@ _clone_weight = C.MultitypeFuncGraph("_clone_weight")
def _clone_weight_process(scale, weight):
return scale_mul(weight, scale)
def _parallel_check():
if context.get_auto_parallel_context("parallel_mode") not in ["semi_auto_parallel",
"auto_parallel", "data_parallel"]:
raise RuntimeError("Stand alone and hybrid parallel mode is not supported to apply adasum.")
if context.get_auto_parallel_context("parallel_mode") == "data_parallel":
logger.warning("For data parallel mode, it is recommended to using mindspore.boost to enable adasum.")
if context.get_auto_parallel_context("enable_parallel_optimizer"):
raise RuntimeError("Currently, the optimizer shard is not supported with applying adasum.")
if context.get_auto_parallel_context("pipeline_stages") > 1:
raise RuntimeError("Currently, the pipeline parallel is not supported with applying adasum.")
class AdaSumByGradWrapCell(Cell):
r"""
Enable the adasum in "auto_parallel/semi_auto_parallel" mode.
Args:
optimizer (Union[Cell]): Optimizer for updating the weights.
optimizer (Union[Cell]): Optimizer for updating the weights. The construct function of the optimizer
requires only one input.
Inputs:
- **grads** (Tuple(Tensor)) - Tuple of gradients.
Examples:
>>> from mindspore import nn
>>> from mindspore.nn import AdaSumByGradWrapCell
>>> net = Net()
>>> optim = AdaSumByGradWrapCell(nn.Momentum(params=net.trainable_params(), learning_rate=0.1, momentum=0.9))
>>> loss = nn.SoftmaxCrossEntropyWithLogits()
>>> model = Model(net, loss_fn=loss, optimizer=optim, metrics=None)
"""
def __init__(self, optimizer):
super(AdaSumByGradWrapCell, self).__init__(auto_prefix=False)
_device_number = 8
_parallel_check()
self.optimizer = optimizer
validator.check_value_type('optimizer', optimizer, (nn.Optimizer,))
self.parameters = optimizer.parameters
self.hyper_map = C.HyperMap()
_device_number = 8
group_number = _get_stage_device_num() // _device_number
self.grad_clone = ParameterTuple(self.parameters)
self.adasum = _AdaSumByGrad(_get_global_rank, _device_number, group_number, self.grad_clone)
self.adasum = _AdaSumByGrad(_get_global_rank(), _device_number, group_number, self.grad_clone)
self.sync_tensor = Parameter(Tensor(0, dtype=mstype.int32))
def construct(self, grads):
@ -386,14 +412,25 @@ class AdaSumByDeltaWeightWrapCell(Cell):
Enable the adasum in "auto_parallel/semi_auto_parallel" mode.
Args:
optimizer (Union[Cell]): Optimizer for updating the weights.
optimizer (Union[Cell]): Optimizer for updating the weights. The construct function of the optimizer
requires only one input.
Inputs:
- **grads** (Tuple(Tensor)) - Tuple of gradients.
Examples:
>>> from mindspore import nn
>>> from mindspore.nn import AdaSumByDeltaWeightWrapCell
>>> net = Net()
>>> optim = AdaSumByGradWrapCell(nn.Momentum(params=net.trainable_params(), learning_rate=0.1, momentum=0.9))
>>> loss = nn.SoftmaxCrossEntropyWithLogits()
>>> model = Model(net, loss_fn=loss, optimizer=optim, metrics=None)
"""
def __init__(self, optimizer):
super(AdaSumByDeltaWeightWrapCell, self).__init__(auto_prefix=False)
_parallel_check()
self.optimizer = optimizer
validator.check_value_type('optimizer', optimizer, (nn.Optimizer,))
self.parameters = optimizer.parameters
self.hyper_map = C.HyperMap()
_device_number = 8

View File

@ -0,0 +1,134 @@
# Copyright 2022 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import mindspore as ms
from mindspore import context, Tensor, Parameter
from mindspore.common.api import _cell_graph_executor
from mindspore.nn import Cell, TrainOneStepCell, Momentum, AdaSumByDeltaWeightWrapCell, AdaSumByGradWrapCell
from mindspore.ops import operations as P
class Net(Cell):
def __init__(self, strategy1=None, strategy2=None, strategy3=None):
super().__init__()
self.mul = P.Mul().shard(strategy1)
self.matmul = P.MatMul().shard(strategy2)
self.gather = P.Gather().shard(strategy3)
self.reduce_sum = P.ReduceSum()
self.mul_weight = Parameter(Tensor(np.ones([64, 32]), dtype=ms.float32), "w1")
self.matmul_weight = Parameter(Tensor(np.ones([32, 32]), dtype=ms.float32), "w2")
self.embedding_table = Parameter(Tensor(np.ones([64, 32]), dtype=ms.float32), "embedding_table")
def construct(self, x, b):
out = self.gather(self.embedding_table, x, 0)
out = self.matmul(out, self.matmul_weight)
out = self.mul(out, self.mul_weight)
out = out + b
return self.reduce_sum(out)
_x = Tensor(np.ones([64]), dtype=ms.int32)
_b = Tensor(np.ones([64, 32]), dtype=ms.float32)
def compile_net(net, by_grad=True):
if by_grad:
optimizer = AdaSumByGradWrapCell(Momentum(net.trainable_params(), learning_rate=0.1, momentum=0.9))
else:
optimizer = AdaSumByDeltaWeightWrapCell(Momentum(net.trainable_params(), learning_rate=0.1, momentum=0.9))
train_net = TrainOneStepCell(net, optimizer)
train_net.set_auto_parallel()
train_net.set_train()
_cell_graph_executor.compile(train_net, _x, _b)
context.reset_auto_parallel_context()
def test_auto_parallel_adasum1():
"""
Feature: adasum in auto parallel.
Description: verify adasum by mul/matmul/gather, rank0, dp, mp, not_full_dp
Expectation: compile done without error.
"""
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", device_num=32, global_rank=0)
mul_strategy1 = ((8, 4), (8, 4))
matmul_strategy2 = ((8, 1), (1, 1))
gather_strategy3 = ((1, 1), (32,))
net = Net(mul_strategy1, matmul_strategy2, gather_strategy3)
compile_net(net)
def test_auto_parallel_adasum2():
"""
Feature: adasum in auto parallel.
Description: verify adasum by mul/matmul/gather, rank0, dp, mp, not_full_dp
Expectation: compile done without error.
"""
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", device_num=32, global_rank=0)
mul_strategy1 = ((8, 4), (8, 4))
matmul_strategy2 = ((8, 1), (1, 1))
gather_strategy3 = ((1, 1), (32,))
net = Net(mul_strategy1, matmul_strategy2, gather_strategy3)
compile_net(net, by_grad=False)
def test_auto_parallel_adasum3():
"""
Feature: adasum in auto parallel.
Description: verify adasum by mul/matmul/gather, rank0, mix_dp_mp, mp
Expectation: compile done without error.
"""
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", device_num=32, global_rank=0)
mul_strategy1 = ((8, 4), (8, 4))
matmul_strategy2 = ((8, 4), (4, 1))
gather_strategy3 = ((32, 1), (1,))
net = Net(mul_strategy1, matmul_strategy2, gather_strategy3)
compile_net(net)
def test_auto_parallel_adasum4():
"""
Feature: adasum in auto parallel.
Description: verify adasum by mul/matmul/gather, rank0, mix_dp_mp, mp
Expectation: compile done without error.
"""
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", device_num=32, global_rank=0)
mul_strategy1 = ((8, 4), (8, 4))
matmul_strategy2 = ((8, 4), (4, 1))
gather_strategy3 = ((32, 1), (1,))
net = Net(mul_strategy1, matmul_strategy2, gather_strategy3)
compile_net(net, by_grad=False)
def test_auto_parallel_adasum5():
"""
Feature: adasum in auto parallel.
Description: verify adasum by mul/matmul/gather, rank16, dp, mp, not_full_dp
Expectation: compile done without error.
"""
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", device_num=32, global_rank=16)
mul_strategy1 = ((8, 4), (8, 4))
matmul_strategy2 = ((8, 1), (1, 1))
gather_strategy3 = ((1, 1), (32,))
net = Net(mul_strategy1, matmul_strategy2, gather_strategy3)
compile_net(net)
def test_auto_parallel_adasum6():
"""
Feature: adasum in auto parallel.
Description: verify adasum by mul/matmul/gather, rank16, dp, mp, not_full_dp
Expectation: compile done without error.
"""
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", device_num=32, global_rank=16)
mul_strategy1 = ((8, 4), (8, 4))
matmul_strategy2 = ((8, 1), (1, 1))
gather_strategy3 = ((1, 1), (32,))
net = Net(mul_strategy1, matmul_strategy2, gather_strategy3)
compile_net(net, by_grad=False)