!65168 Add testcases for dataset_sink_mode

Merge pull request !65168 from maning202007/r2.3
This commit is contained in:
i-robot 2024-02-29 03:50:11 +00:00 committed by Gitee
commit 0837b1d299
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
5 changed files with 88 additions and 269 deletions

View File

@ -234,6 +234,7 @@
"mindspore/tests/st/ops/dynamic_shape/" "too-many-locals"
"mindspore/tests/ut/python/graph_syntax/test_invalid_attribute.py" "misplaced-bare-raise"
"mindspore/tests/st/fallback/test_graph_fallback_runtime_is_isnot.py" "iteral-comparison"
"mindspore/tests/st/dump/dump_test_utils.py" "too-many-nested-blocks"
#MindSpore Lite
"mindspore/mindspore/ccsrc/plugin/device/cpu/kernel/nnacl/experimental/HPC-generator/generator.py" "redefined-builtin"

View File

@ -18,11 +18,11 @@ Utils for testing dump feature.
import json
import os
import time
import glob
import csv
import numpy as np
async_dump_dict = {
"common_dump_settings": {
"dump_mode": 0,
@ -284,8 +284,10 @@ def check_statistic_dump(dump_file_path):
with open(real_path) as f:
reader = csv.DictReader(f)
stats = list(reader)
def get_add_node(statistic):
return statistic['Op Type'] == 'Add'
add_statistics = list(filter(get_add_node, stats))
num_tensors = len(add_statistics)
assert num_tensors == 3
@ -313,3 +315,74 @@ def check_data_dump(dump_file_path, is_ge=False):
output = np.load(real_path)
expect = np.array([[8, 10, 12], [14, 16, 18]], np.float32)
assert np.array_equal(output, expect)
def check_saved_data(iteration_path, saved_data):
if not saved_data:
return
if saved_data in ('statistic', 'full'):
check_statistic_dump(iteration_path)
if saved_data in ('tensor', 'full'):
check_data_dump(iteration_path, True)
if saved_data == 'statistic':
# assert only file is statistic.csv, tensor data is not saved
assert len(os.listdir(iteration_path)) == 1
elif saved_data == 'tensor':
# assert only tensor data is saved, not statistics
stat_path = os.path.join(iteration_path, 'statistic.csv')
assert not os.path.isfile(stat_path)
def check_overflow_file(iteration_path, overflow_num, need_check):
if not need_check:
return overflow_num
overflow_files = glob.glob(os.path.join(iteration_path, "Opdebug.Node_OpDebug.*.*.*"))
overflow_num += len(overflow_files)
return overflow_num
def check_iteration(iteration_id, num_iteration):
if iteration_id.isdigit():
assert int(iteration_id) < num_iteration
def check_ge_dump_structure(dump_path, num_iteration, device_num=1, check_overflow=False, saved_data=None,
check_data=True):
overflow_num = 0
for _ in range(3):
if not os.listdir(dump_path):
time.sleep(2)
sub_paths = os.listdir(dump_path)
assert sub_paths
device_path_num = 0
for sub_path in sub_paths:
# on GE, the whole dump directory of one training is saved within a time path, like '20230822120819'
if not (sub_path.isdigit() and len(sub_path) == 14):
continue
time_path = os.path.join(dump_path, sub_path)
assert os.path.isdir(time_path)
device_paths = os.listdir(time_path)
device_path_num += len(device_paths)
for device_path in device_paths:
assert device_path.isdigit()
abs_device_path = os.path.join(time_path, device_path)
assert os.path.isdir(abs_device_path)
model_names = os.listdir(abs_device_path)
for model_name in model_names:
model_path = os.path.join(abs_device_path, model_name)
assert os.path.isdir(model_path)
model_ids = os.listdir(model_path)
for model_id in model_ids:
model_id_path = os.path.join(model_path, model_id)
assert os.path.isdir(model_id_path)
iteration_ids = os.listdir(model_id_path)
for iteration_id in iteration_ids:
check_iteration(iteration_id, num_iteration)
iteration_path = os.path.join(model_id_path, iteration_id)
assert os.path.isdir(iteration_path)
if check_data:
check_saved_data(iteration_path, saved_data)
overflow_num = check_overflow_file(iteration_path, overflow_num, check_overflow)
assert device_path_num == device_num
if check_overflow:
assert overflow_num

View File

@ -13,11 +13,9 @@
# limitations under the License.
# ============================================================================
import os
import sys
import tempfile
import time
import shutil
import glob
import numpy as np
import pytest
from mindspore import context, Model, nn
@ -25,7 +23,7 @@ from mindspore.nn import SoftmaxCrossEntropyWithLogits, Accuracy
from mindspore.common import set_seed
from mindspore.common.initializer import Normal
import mindspore.dataset as ds
from dump_test_utils import generate_dump_json, generate_statistic_dump_json, check_dump_structure
from dump_test_utils import generate_dump_json, check_ge_dump_structure
from tests.security_utils import security_off_wrap
set_seed(1)
@ -88,20 +86,19 @@ def run_async_dump(test_name):
dump_config_path = os.path.join(tmp_dir, 'async_dump.json')
generate_dump_json(dump_path, dump_config_path, test_name, 'LeNet')
os.environ['MINDSPORE_DUMP_CONFIG'] = dump_config_path
dump_file_path = os.path.join(dump_path, 'rank_0', 'LeNet', '1', '0')
if os.path.isdir(dump_path):
shutil.rmtree(dump_path)
train_net(1, 1, True)
for _ in range(3):
if not os.path.exists(dump_file_path):
if not os.path.exists(dump_path):
time.sleep(2)
check_dump_structure(dump_path, dump_config_path, 1, 1, 1, [1], [0])
constant_path = os.path.join(dump_path, 'rank_0', 'LeNet', '1', 'constants')
assert os.path.exists(constant_path)
check_ge_dump_structure(dump_path, 1, 1)
del os.environ['MINDSPORE_DUMP_CONFIG']
@pytest.mark.level1
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
@security_off_wrap
def test_async_dump_dataset_sink():
@ -111,172 +108,3 @@ def test_async_dump_dataset_sink():
Expectation: dump data are generated as protobuf file format (suffix with timestamp)
"""
run_async_dump("test_async_dump_dataset_sink")
def run_e2e_dump():
"""Run lenet with sync dump."""
if sys.platform != 'linux':
return
with tempfile.TemporaryDirectory(dir='/tmp') as tmp_dir:
dump_path = os.path.join(tmp_dir, 'e2e_dump')
dump_config_path = os.path.join(tmp_dir, 'e2e_dump.json')
generate_dump_json(dump_path, dump_config_path, 'test_e2e_dump', 'LeNet')
os.environ['MINDSPORE_DUMP_CONFIG'] = dump_config_path
dump_file_path = os.path.join(dump_path, 'rank_0', 'LeNet', '1', '0')
if os.path.isdir(dump_path):
shutil.rmtree(dump_path)
train_net(1, 1, True)
for _ in range(3):
if not os.path.exists(dump_file_path):
time.sleep(2)
check_dump_structure(dump_path, dump_config_path, 1, 1, 1, [1], [0])
constant_path = os.path.join(dump_path, 'rank_0', 'LeNet', '1', 'constants')
assert os.path.exists(constant_path)
del os.environ['MINDSPORE_DUMP_CONFIG']
@pytest.mark.level1
@pytest.mark.env_onecard
@security_off_wrap
def test_e2e_dump():
"""
Feature: sync dump on Ascend.
Description: test sync dump with dataset_sink_mode=True.
Expectation: dump data are generated.
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
run_e2e_dump()
@pytest.mark.level1
@pytest.mark.env_onecard
@security_off_wrap
def test_e2e_dump_with_hccl_env():
"""
Feature: sync dump on Ascend.
Description: test sync dump with dataset_sink_mode=True, RANK_TABLE_FILE and RANK_ID envs are set.
Expectation: dump data are generated.
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
os.environ["RANK_TABLE_FILE"] = "invalid_file.json"
os.environ["RANK_ID"] = "4"
run_e2e_dump()
del os.environ['RANK_TABLE_FILE']
del os.environ['RANK_ID']
@pytest.mark.level1
@pytest.mark.env_onecard
@security_off_wrap
def test_dump_with_diagnostic_path():
"""
Feature: Sync dump on Ascend.
Description: Test sync dump with dataset_sink_mode=True when path is not set (set to empty) in dump json file and
MS_DIAGNOSTIC_DATA_PATH is set.
Expectation: Data is expected to be dumped into MS_DIAGNOSTIC_DATA_PATH/debug_dump.
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
with tempfile.TemporaryDirectory(dir='/tmp') as tmp_dir:
dump_config_path = os.path.join(tmp_dir, 'e2e_dump.json')
generate_dump_json('', dump_config_path, 'test_e2e_dump', 'LeNet')
os.environ['MINDSPORE_DUMP_CONFIG'] = dump_config_path
diagnose_path = os.path.join(tmp_dir, 'e2e_dump')
os.environ['MS_DIAGNOSTIC_DATA_PATH'] = diagnose_path
if os.path.isdir(diagnose_path):
shutil.rmtree(diagnose_path)
train_net(1, 1, True)
dump_path = os.path.join(diagnose_path, 'debug_dump')
dump_file_path = os.path.join(dump_path, 'rank_0', 'LeNet', '1', '0')
for _ in range(3):
if not os.path.exists(dump_file_path):
time.sleep(2)
check_dump_structure(dump_path, dump_config_path, 1, 1, 1, [1], [0])
constant_path = os.path.join(dump_path, 'rank_0', 'LeNet', '1', 'constants')
assert os.path.exists(constant_path)
del os.environ['MINDSPORE_DUMP_CONFIG']
del os.environ['MS_DIAGNOSTIC_DATA_PATH']
def check_statistic_dump(dump_file_path):
"""Check whether the statistic file exists in dump_file_path."""
output_name = "statistic.csv"
output_path = glob.glob(os.path.join(dump_file_path, output_name))[0]
real_path = os.path.realpath(output_path)
assert os.path.getsize(real_path)
def check_data_dump(dump_file_path):
"""Check whether the tensor files exists in dump_file_path."""
output_name = "*.npy"
output_files = glob.glob(os.path.join(dump_file_path, output_name))
assert len(output_files) > 11
def run_saved_data_dump_test(scenario, saved_data):
"""Run e2e dump on scenario, testing the saved_data field in dump config file."""
if sys.platform != 'linux':
return
with tempfile.TemporaryDirectory(dir='/tmp') as tmp_dir:
dump_path = os.path.join(tmp_dir, 'test_saved_data')
dump_config_path = os.path.join(tmp_dir, 'test_saved_data.json')
generate_statistic_dump_json(dump_path, dump_config_path, scenario, saved_data, 'LeNet')
os.environ['MINDSPORE_DUMP_CONFIG'] = dump_config_path
dump_file_path = os.path.join(dump_path, 'rank_0', 'LeNet', '1', '0')
if os.path.isdir(dump_path):
shutil.rmtree(dump_path)
train_net(1, 1, True)
for _ in range(3):
if not os.path.exists(dump_file_path):
time.sleep(2)
check_dump_structure(dump_path, dump_config_path, 1, 1, 1, [1], [0])
if saved_data in ('statistic', 'full'):
check_statistic_dump(dump_file_path)
if saved_data in ('tensor', 'full'):
check_data_dump(dump_file_path)
if saved_data == 'statistic':
# assert only file is statistic.csv, tensor data is not saved
assert len(os.listdir(dump_file_path)) == 1
elif saved_data == 'tensor':
# assert only tensor data is saved, not statistics
stat_path = os.path.join(dump_file_path, 'statistic.csv')
assert not os.path.isfile(stat_path)
del os.environ['MINDSPORE_DUMP_CONFIG']
@pytest.mark.level0
@pytest.mark.env_onecard
@security_off_wrap
def test_ascend_statistic_dump():
"""
Feature: Ascend Statistics Dump
Description: Test Ascend statistics dump
Expectation: Statistics are stored in statistic.csv files
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
run_saved_data_dump_test('test_async_dump', 'statistic')
@pytest.mark.level0
@pytest.mark.env_onecard
@security_off_wrap
def test_ascend_tensor_dump():
"""
Feature: Ascend Tensor Dump
Description: Test Ascend tensor dump
Expectation: Tensors are stored in npy files
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
run_saved_data_dump_test('test_async_dump', 'tensor')
@pytest.mark.level1
@pytest.mark.env_onecard
@security_off_wrap
def test_ascend_full_dump():
"""
Feature: Ascend Full Dump
Description: Test Ascend full dump
Expectation: Tensors are stored in npy files and their statistics stored in statistic.csv
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
run_saved_data_dump_test('test_async_dump', 'full')

View File

@ -15,15 +15,14 @@
import os
import sys
import tempfile
import glob
import time
import shutil
import pytest
import numpy as np
from dump_test_utils import generate_dump_json, check_dump_structure
from dump_test_utils import generate_dump_json, check_ge_dump_structure
@pytest.mark.level1
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_single
def test_dump_hccl():
"""
@ -38,24 +37,10 @@ def test_dump_hccl():
dump_config_path = os.path.join(tmp_dir, 'test_dump_hccl.json')
generate_dump_json(dump_path, dump_config_path, 'test_async_dump_npy')
os.environ['MINDSPORE_DUMP_CONFIG'] = dump_config_path
dump_file_path = os.path.join(dump_path, 'rank_0', 'Net', '0', '0')
if os.path.isdir(dump_path):
shutil.rmtree(dump_path)
exec_network_cmd = 'cd {0}; bash run_allreduce.sh'.format(os.path.split(os.path.realpath(__file__))[0])
ret = os.system(exec_network_cmd)
print("ret of exec_network_cmd: ", ret)
for _ in range(3):
if not os.path.exists(dump_file_path):
time.sleep(2)
check_dump_structure(dump_path, dump_config_path, 8, 1, 1)
output_name = "AllReduce.AllReduce-op*.*.*.*.output.0.*.npy"
# Check data in 8 cards.
for i in range(8):
dump_file_path = os.path.join(dump_path, 'rank_{}'.format(i), 'Net', '0', '0')
output_path = glob.glob(os.path.join(dump_file_path, output_name))[0]
real_path = os.path.realpath(output_path)
output = np.load(real_path)
expect = [[28]]
assert output.shape == (1, 1)
assert np.array_equal(output, expect)
check_ge_dump_structure(dump_path, 1, 8, saved_data='tensor', check_data=False)
del os.environ['MINDSPORE_DUMP_CONFIG']

View File

@ -16,7 +16,6 @@ import os
import sys
import tempfile
import shutil
import glob
import numpy as np
import pytest
import time
@ -33,13 +32,11 @@ from mindspore.nn import SoftmaxCrossEntropyWithLogits
from mindspore.nn import Momentum
from mindspore.nn import TrainOneStepCell
from mindspore.nn import WithLossCell
from dump_test_utils import generate_dump_json, generate_dump_json_with_overflow, \
generate_statistic_dump_json, check_statistic_dump, check_data_dump
from dump_test_utils import generate_dump_json, generate_dump_json_with_overflow, generate_statistic_dump_json, \
check_ge_dump_structure, check_saved_data, check_iteration, check_overflow_file
from tests.security_utils import security_off_wrap
class Net(nn.Cell):
def __init__(self):
super(Net, self).__init__()
@ -62,72 +59,6 @@ x = np.array([[1, 2, 3], [4, 5, 6]]).astype(np.float32)
y = np.array([[7, 8, 9], [10, 11, 12]]).astype(np.float32)
def check_saved_data(iteration_path, saved_data):
if not saved_data:
return
if saved_data in ('statistic', 'full'):
check_statistic_dump(iteration_path)
if saved_data in ('tensor', 'full'):
check_data_dump(iteration_path, True)
if saved_data == 'statistic':
# assert only file is statistic.csv, tensor data is not saved
assert len(os.listdir(iteration_path)) == 1
elif saved_data == 'tensor':
# assert only tensor data is saved, not statistics
stat_path = os.path.join(iteration_path, 'statistic.csv')
assert not os.path.isfile(stat_path)
def check_overflow_file(iteration_path, overflow_num, need_check):
if not need_check:
return overflow_num
overflow_files = glob.glob(os.path.join(iteration_path, "Opdebug.Node_OpDebug.*.*.*"))
overflow_num += len(overflow_files)
return overflow_num
def check_iteration(iteration_id, num_iteration):
if iteration_id.isdigit():
assert int(iteration_id) < num_iteration
def check_ge_dump_structure(dump_path, num_iteration, device_num=1, check_overflow=False, saved_data=None):
overflow_num = 0
for _ in range(3):
if not os.path.exists(dump_path):
time.sleep(2)
sub_paths = os.listdir(dump_path)
for sub_path in sub_paths:
# on GE, the whole dump directory of one training is saved within a time path, like '20230822120819'
if not (sub_path.isdigit() and len(sub_path) == 14):
continue
time_path = os.path.join(dump_path, sub_path)
assert os.path.isdir(time_path)
device_paths = os.listdir(time_path)
assert len(device_paths) == device_num
for device_path in device_paths:
assert device_path.isdigit()
abs_device_path = os.path.join(time_path, device_path)
assert os.path.isdir(abs_device_path)
model_names = os.listdir(abs_device_path)
for model_name in model_names:
model_path = os.path.join(abs_device_path, model_name)
assert os.path.isdir(model_path)
model_ids = os.listdir(model_path)
for model_id in model_ids:
model_id_path = os.path.join(model_path, model_id)
assert os.path.isdir(model_id_path)
iteration_ids = os.listdir(model_id_path)
for iteration_id in iteration_ids:
check_iteration(iteration_id, num_iteration)
iteration_path = os.path.join(model_id_path, iteration_id)
assert os.path.isdir(iteration_path)
check_saved_data(iteration_path, saved_data)
overflow_num = check_overflow_file(iteration_path, overflow_num, check_overflow)
if check_overflow:
assert overflow_num
def check_ge_dump_structure_acl(dump_path, num_iteration, device_num=1, check_overflow=False, saved_data=None):
overflow_num = 0
for _ in range(3):
@ -472,6 +403,7 @@ def test_ge_full_dump():
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
run_saved_data_dump_test('test_ge_dump', 'full')
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training