ut for allgather fusion

This commit is contained in:
jiahongQian 2022-02-24 10:55:23 +08:00
parent 8a2151d8bb
commit 25f57505bf
3 changed files with 109 additions and 9 deletions

View File

@ -219,7 +219,9 @@ MindSpore context用于配置当前执行环境包括执行模式、执行
- **comm_fusion** (dict) - 用于设置通信算子的融合配置。可以同一类型的通信算子按梯度张量的大小或者顺序分块传输。输入格式为{"通信类型": {"mode":str, "config": None int 或者 list}},每种通信算子的融合配置有两个键:"mode"和"config"。支持以下通信类型的融合类型和配置:
- allreduce: 进行allreduce算子的通信融合。"mode"包含:"auto"、"size"和"index"。在"auto"模式下allreduce融合的是梯度变量的大小默认值阈值为"64"MB"config"对应的值为None。在"size"模式下需要用户在config的字典中指定梯度大小阈值这个值必须大于"0"MB。在"mode"为"index"时,它与"all_reduce_fusion_config"相同,用户需要给"config"传入一个列表,里面每个值表示梯度的索引。
- allreduce: 进行AllReduce算子的通信融合。"mode"包含:"auto"、"size"和"index"。在"auto"模式下,融合的是梯度变量的大小,默认值阈值为"64"MB"config"对应的值为None。在"size"模式下需要用户在config的字典中指定梯度大小阈值这个值必须大于"0"MB。在"mode"为"index"时,它与"all_reduce_fusion_config"相同,用户需要给"config"传入一个列表,里面每个值表示梯度的索引。
- allgather: 进行AllGather算子的通信融合。"mode"包含:"auto"、"size"。"auto" 和 "size"模式的配置方式与AllReduce相同。
- reducescatter: 进行ReduceScatter算子的通信融合。"mode"包含:"auto"、"size"。"auto" 和 "size"模式的配置方式与AllReduce相同。
**异常:**

View File

@ -509,11 +509,19 @@ def set_auto_parallel_context(**kwargs):
It supports following communication fusion types and configurations:
- allreduce: If communication fusion type is `allreduce`. The `mode` contains: `auto`, `size`
and `index`. In `auto` mode, allreduce fusion is configured by gradients size, and the default
fusion threshold is `64` MB. In 'size' mode, allreduce fusion is configured by gradients size
and `index`. In `auto` mode, AllReduce fusion is configured by gradients size and the default
fusion threshold is `64` MB. In 'size' mode, AllReduce fusion is configured by gradients size
manually, and the fusion threshold must be larger than `0` MB. In `index` mode, it is same as
`all_reduce_fusion_config`.
- allgather: If communication fusion type is `allgather`. The `mode` contains: `auto`, `size`.
In `auto` mode, AllGather fusion is configured by gradients size, and the default fusion
threshold is `64` MB. In 'size' mode, AllGather fusion is configured by gradients size
manually, and the fusion threshold must be larger than `0` MB.
- reducescatter: If communication fusion type is `reducescatter`. The `mode` contains: `auto`
and `size`. Config is same as `allgather`.
Raises:
ValueError: If input key is not attribute in auto parallel context.
@ -535,8 +543,8 @@ def set_auto_parallel_context(**kwargs):
>>> context.set_auto_parallel_context(pipeline_stages=2)
>>> parallel_config = {"gradient_accumulation_shard": True}
>>> context.set_auto_parallel_context(parallel_optimizer_config=parallel_config, enable_parallel_optimizer=True)
>>> comm_fusion_config = {"allreduce": {"mode": "size", "config": 32}}
>>> context.set_auto_parallel_context(comm_fusion=comm_fusion_config)
>>> config = {"allreduce": {"mode": "size", "config": 32}, "allgather": {"mode": "size", "config": 32}}
>>> context.set_auto_parallel_context(comm_fusion=config)
"""
_set_auto_parallel_context(**kwargs)
@ -767,8 +775,8 @@ def set_context(**kwargs):
Indicates whether to enable image-computing convergence to optimize network execution performance.
If enable_graph_kernel is set to True, acceleration can be enabled.
For details of graph kernel fusion, please check
`Enabling Graph Kernel Fusion <https://www.mindspore.cn/docs/programming_guide
/en/master/enable_graph_kernel_fusion.html>`_.
`Enabling Graph Kernel Fusion
<https://www.mindspore.cn/docs/programming_guide/en/master/enable_graph_kernel_fusion.html>`_.
graph_kernel_flags (str)
Optimization options of graph kernel fusion, and the priority is higher when it conflicts
with enable_graph_kernel. Only for experienced users.
@ -802,8 +810,8 @@ def set_context(**kwargs):
(Automatic selection).
For more information about the enable operator tuning tool settings, please check
`Enable the operator optimization tool <https://www.mindspore.cn/docs/programming_guide/en
/master/enable_auto_tune.html>`_.
`Enable the operator optimization tool
<https://www.mindspore.cn/docs/programming_guide/en/master/enable_auto_tune.html>`_.
check_bprop (bool): Whether to check back propagation nodes. The checking ensures that the shape and dtype
of back propagation node outputs is the same as input parameters. Default: False.
max_call_depth (int): Specify the maximum depth of function call. Must be positive integer. Default: 1000.

View File

@ -13,6 +13,7 @@
# limitations under the License.
# ============================================================================
import numpy as np
import pytest
import mindspore as ms
import mindspore.nn as nn
from mindspore import context
@ -23,6 +24,8 @@ from mindspore.common.initializer import initializer
from mindspore.train.model import Model
from mindspore.nn.wrap.cell_wrapper import PipelineCell
from mindspore.parallel._auto_parallel_context import auto_parallel_context
from tests.ut.python.parallel.test_adafactor import compile_net
from tests.ut.python.parallel.test_adafactor import Net as Net2
class DatasetLenet():
@ -146,3 +149,90 @@ def test_fusion_auto():
model.train(2, dataset, dataset_sink_mode=False)
assert auto_parallel_context().allgather_fusion_threshold_mb() == 64
assert auto_parallel_context().reducescatter_fusion_threshold_mb() == 64
def test_fusion_optimizer_parallel():
"""
Feature: test_fusion_optimizer_parallel in size mode
Description: allgather and reduce scatter size fusion in optimizer parallel
Expectation: compile success
"""
allgather_threshold = 16
reducescatter_threshold = 8
comm_fusion_dict = {"allgather": {"mode": "size", "config": allgather_threshold},
"reducescatter": {"mode": "size", "config": reducescatter_threshold}}
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", device_num=16, global_rank=0,
enable_parallel_optimizer=True, comm_fusion=comm_fusion_dict)
_w0 = Tensor(np.ones([64, 16, 2]), dtype=ms.float32)
_w1 = Tensor(np.ones([32, 32]), dtype=ms.float32)
_w2 = Tensor(np.ones([32]), dtype=ms.float32)
strategy1 = ((4, 2), (2, 2))
strategy2 = ((4, 2), (2,))
net = Net2(_w0, _w1, _w2, strategy1, strategy2)
compile_net(net)
comm_fusion_dict = {"allgather": {"mode": "auto", "config": None},
"reducescatter": {"mode": "auto", "config": None}}
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", device_num=16, global_rank=0,
enable_parallel_optimizer=True, comm_fusion=comm_fusion_dict)
compile_net(net)
def test_allgather_fusion_invalid_value_failed():
"""
Feature: test_allgather_fusion with invalid value
Description: test_allgather_fusion with invalid value
Expectation: throw TypeError
"""
with pytest.raises(TypeError):
comm_fusion_dict = [1, 2]
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", comm_fusion=comm_fusion_dict)
with pytest.raises(TypeError):
comm_fusion_dict = {"allgather": [1, 2]}
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", comm_fusion=comm_fusion_dict)
with pytest.raises(TypeError):
comm_fusion_dict = {"allgather": {"mode": "size", "config": "30.12"}}
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", comm_fusion=comm_fusion_dict)
with pytest.raises(KeyError):
comm_fusion_dict = {"all": {"mode": "size", "config": 30}}
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", comm_fusion=comm_fusion_dict)
with pytest.raises(KeyError):
comm_fusion_dict = {"allgather": {"modes": "size", "config": 30}}
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", comm_fusion=comm_fusion_dict)
with pytest.raises(KeyError):
comm_fusion_dict = {"allgather": {"mode": "sizes", "config": 30}}
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", comm_fusion=comm_fusion_dict)
with pytest.raises(KeyError):
comm_fusion_dict = {"allgather": {"mode": "size"}}
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", comm_fusion=comm_fusion_dict)
def test_reducescatter_fusion_invalid_value_failed():
"""
Feature: test_reducescatter_fusion with invalid value
Description: test_reducescatter_fusion with invalid value
Expectation: throw TypeError
"""
with pytest.raises(TypeError):
comm_fusion_dict = {"reducescatter": [1, 2]}
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", comm_fusion=comm_fusion_dict)
with pytest.raises(TypeError):
comm_fusion_dict = {"reducescatter": {"mode": "size", "config": "30.12"}}
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", comm_fusion=comm_fusion_dict)
with pytest.raises(KeyError):
comm_fusion_dict = {"reducescatter": {"modes": "size", "config": 30}}
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", comm_fusion=comm_fusion_dict)
with pytest.raises(KeyError):
comm_fusion_dict = {"reducescatter": {"mode": "sizes", "config": 30}}
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", comm_fusion=comm_fusion_dict)
with pytest.raises(KeyError):
comm_fusion_dict = {"reducescatter": {"mode": "size"}}
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel", comm_fusion=comm_fusion_dict)