diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc index 593ac5da7f5..7bfd9c8a3cf 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc @@ -529,7 +529,8 @@ Status CacheAdminArgHandler::StopServer(CommandId command_id) { if (rc.IsError()) { msg.RemoveResourcesOnExit(); if (rc == StatusCode::kMDNetWorkError) { - std::string errMsg = "Server on port " + std::to_string(port_) + " is not up or has been shutdown already."; + std::string errMsg = + "Server on port " + std::to_string(port_) + " is not reachable or has been shutdown already."; return Status(StatusCode::kMDNetWorkError, errMsg); } return rc; diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.cc index 3fc6788157c..7048dffad35 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.cc @@ -41,15 +41,15 @@ Status CacheClient::Builder::Build(std::shared_ptr *out) { } Status CacheClient::Builder::SanityCheck() { - CHECK_FAIL_RETURN_SYNTAX_ERROR(session_id_ > 0, "session id must be positive"); - CHECK_FAIL_RETURN_SYNTAX_ERROR(cache_mem_sz_ >= 0, "cache memory size must not be negative. (0 implies unlimited"); - CHECK_FAIL_RETURN_SYNTAX_ERROR(num_connections_ > 0, "rpc connections must be positive"); - CHECK_FAIL_RETURN_SYNTAX_ERROR(prefetch_size_ > 0, "prefetch size must be positive"); - CHECK_FAIL_RETURN_SYNTAX_ERROR(!hostname_.empty(), "hostname must not be empty"); - CHECK_FAIL_RETURN_SYNTAX_ERROR(port_ > 1024, "Port must be in range (1025..65535)"); - CHECK_FAIL_RETURN_SYNTAX_ERROR(port_ <= 65535, "Port must be in range (1025..65535)"); + CHECK_FAIL_RETURN_SYNTAX_ERROR(session_id_ > 0, "session id must be positive."); + CHECK_FAIL_RETURN_SYNTAX_ERROR(cache_mem_sz_ >= 0, "cache memory size must not be negative (0 implies unlimited)."); + CHECK_FAIL_RETURN_SYNTAX_ERROR(num_connections_ > 0, "number of tcp/ip connections must be positive."); + CHECK_FAIL_RETURN_SYNTAX_ERROR(prefetch_size_ > 0, "prefetch size must be positive."); + CHECK_FAIL_RETURN_SYNTAX_ERROR(!hostname_.empty(), "hostname must not be empty."); + CHECK_FAIL_RETURN_SYNTAX_ERROR(port_ > 1024, "Port must be in range (1025..65535)."); + CHECK_FAIL_RETURN_SYNTAX_ERROR(port_ <= 65535, "Port must be in range (1025..65535)."); CHECK_FAIL_RETURN_SYNTAX_ERROR(hostname_ == "127.0.0.1", - "now cache client has to be on the same host with cache server"); + "now cache client has to be on the same host with cache server."); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_common.h b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_common.h index 03cdf1daf97..3c270da3138 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_common.h +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_common.h @@ -73,7 +73,11 @@ inline void Status2CacheReply(const Status &rc, CacheReply *reply) { /// \param port /// \return unix socket url inline std::string PortToUnixSocketPath(int port) { - return kDefaultCommonPath + std::string("/cache_server_p") + std::to_string(port); +#if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__) + return kDefaultCommonPath + Services::GetUserName() + std::string("/cache_server_p") + std::to_string(port); +#else + return kDefaultCommonPath; +#endif } /// \brief Round up to the next 4k diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_ipc.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_ipc.cc index ae75d064d18..f397d92618f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_ipc.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_ipc.cc @@ -42,7 +42,7 @@ SharedMessage::~SharedMessage() { Status SharedMessage::Create() { CHECK_FAIL_RETURN_UNEXPECTED(msg_qid_ == -1, "Message queue already created"); - auto access_mode = S_IRUSR | S_IWUSR | S_IROTH | S_IWOTH | S_IRGRP | S_IWGRP; + auto access_mode = S_IRUSR | S_IWUSR; msg_qid_ = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | access_mode); if (msg_qid_ == -1) { std::string errMsg = "Unable to create a message queue. Errno = " + std::to_string(errno); @@ -99,7 +99,7 @@ SharedMemory::~SharedMemory() { } Status SharedMemory::Create(int64_t sz) { - auto access_mode = S_IRUSR | S_IWUSR | S_IROTH | S_IWOTH | S_IRGRP | S_IWGRP; + auto access_mode = S_IRUSR | S_IWUSR; shm_id_ = shmget(shm_key_, sz, IPC_CREAT | IPC_EXCL | access_mode); if (shm_id_ == -1) { RETURN_STATUS_UNEXPECTED("Shared memory creation failed. Errno " + std::to_string(errno)); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc index 6744b02778f..22b148a690a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc @@ -374,6 +374,7 @@ uint32_t DatasetOp::GenerateCRC(const std::shared_ptr &op) { std::string ss_str = ss.str(); // Filter out the Num workers field when generating the check sum + ss_str = std::regex_replace(ss_str, std::regex("Number of ShardReader workers.*\n"), ""); ss_str = std::regex_replace(ss_str, std::regex("Num workers.*\n"), ""); ss_str = std::regex_replace(ss_str, std::regex("\\[workers.*?\\]"), ""); ss_str = std::regex_replace(ss_str, std::regex("Connector queue size.*\n"), ""); diff --git a/mindspore/dataset/core/validator_helpers.py b/mindspore/dataset/core/validator_helpers.py index dc3f298c637..c67e6ea09de 100644 --- a/mindspore/dataset/core/validator_helpers.py +++ b/mindspore/dataset/core/validator_helpers.py @@ -129,6 +129,11 @@ def check_uint32(value, arg_name=""): check_value(value, [UINT32_MIN, UINT32_MAX]) +def check_pos_uint32(value, arg_name=""): + type_check(value, (int,), arg_name) + check_value(value, [POS_INT_MIN, UINT32_MAX]) + + def check_pos_int32(value, arg_name=""): type_check(value, (int,), arg_name) check_value(value, [POS_INT_MIN, INT32_MAX], arg_name) diff --git a/mindspore/dataset/engine/cache_client.py b/mindspore/dataset/engine/cache_client.py index f9ab70527a6..c5cb54c9f01 100644 --- a/mindspore/dataset/engine/cache_client.py +++ b/mindspore/dataset/engine/cache_client.py @@ -18,7 +18,8 @@ import copy from mindspore._c_dataengine import CacheClient -from ..core.validator_helpers import type_check, check_uint32, check_uint64, check_positive, check_value +from ..core.validator_helpers import type_check, check_pos_int32, check_pos_uint32, check_uint64, check_positive, \ + check_value class DatasetCache: @@ -50,7 +51,7 @@ class DatasetCache: def __init__(self, session_id, size=0, spilling=False, hostname=None, port=None, num_connections=None, prefetch_size=None): - check_uint32(session_id, "session_id") + check_pos_uint32(session_id, "session_id") type_check(size, (int,), "size") if size != 0: check_positive(size, "size") @@ -62,9 +63,9 @@ class DatasetCache: type_check(port, (int,), "port") check_value(port, (1025, 65535), "port") if num_connections is not None: - check_uint32(num_connections, "num_connections") + check_pos_int32(num_connections, "num_connections") if prefetch_size is not None: - check_uint32(prefetch_size, "prefetch_size") + check_pos_int32(prefetch_size, "prefetch_size") self.session_id = session_id self.size = size diff --git a/model_zoo/official/cv/mobilenetv2/scripts/cache_util.sh b/model_zoo/official/cv/mobilenetv2/scripts/cache_util.sh index 10357577e4a..a3aa77e54a8 100644 --- a/model_zoo/official/cv/mobilenetv2/scripts/cache_util.sh +++ b/model_zoo/official/cv/mobilenetv2/scripts/cache_util.sh @@ -1,4 +1,4 @@ -#!/usr/bin/env bash +#!/bin/bash # Copyright 2021 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -42,8 +42,8 @@ shutdown_cache_server() result=$(cache_admin --stop 2>&1) rc=$? echo "${result}" - if [ "${rc}" -ne 0 ] && [[ ! ${result} =~ "Server on port 50052 is not up or has been shutdown already" ]]; then + if [ "${rc}" -ne 0 ] && [[ ! ${result} =~ "Server on port 50052 is not reachable or has been shutdown already" ]]; then echo "cache_admin command failure!" "${result}" exit 1 fi -} \ No newline at end of file +} diff --git a/model_zoo/official/cv/mobilenetv2/scripts/run_train_nfs_cache.sh b/model_zoo/official/cv/mobilenetv2/scripts/run_train_nfs_cache.sh index 4abe588b3a6..7912341cfde 100644 --- a/model_zoo/official/cv/mobilenetv2/scripts/run_train_nfs_cache.sh +++ b/model_zoo/official/cv/mobilenetv2/scripts/run_train_nfs_cache.sh @@ -92,6 +92,7 @@ run_ascend() &> log$i.log & cd .. done + echo -e "\nWhen training run is done, remember to shut down the cache server via \"cache_admin --stop\"" } run_gpu() @@ -149,6 +150,7 @@ run_gpu() --enable_cache=True \ --cache_session_id=$CACHE_SESSION_ID \ &> ../train.log & # dataset train folder + echo -e "\nWhen training run is done, remember to shut down the cache server via \"cache_admin --stop\"" } run_cpu() @@ -198,6 +200,7 @@ run_cpu() --enable_cache=True \ --cache_session_id=$CACHE_SESSION_ID \ &> ../train.log & # dataset train folder + echo -e "\nWhen training run is done, remember to shut down the cache server via \"cache_admin --stop\"" } if [ $1 = "Ascend" ] ; then diff --git a/model_zoo/official/cv/mobilenetv2/train.py b/model_zoo/official/cv/mobilenetv2/train.py index 281acbf9591..f8c9be5eb4f 100644 --- a/model_zoo/official/cv/mobilenetv2/train.py +++ b/model_zoo/official/cv/mobilenetv2/train.py @@ -131,3 +131,6 @@ if __name__ == '__main__': if (epoch + 1) % config.save_checkpoint_epochs == 0: save_checkpoint(net, os.path.join(save_ckpt_path, f"mobilenetv2_{epoch+1}.ckpt")) print("total cost {:5.4f} s".format(time.time() - start)) + + if args_opt.enable_cache: + print("Remember to shut down the cache server via \"cache_admin --stop\"") diff --git a/model_zoo/official/cv/resnet/scripts/cache_util.sh b/model_zoo/official/cv/resnet/scripts/cache_util.sh index 10357577e4a..a3aa77e54a8 100644 --- a/model_zoo/official/cv/resnet/scripts/cache_util.sh +++ b/model_zoo/official/cv/resnet/scripts/cache_util.sh @@ -1,4 +1,4 @@ -#!/usr/bin/env bash +#!/bin/bash # Copyright 2021 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -42,8 +42,8 @@ shutdown_cache_server() result=$(cache_admin --stop 2>&1) rc=$? echo "${result}" - if [ "${rc}" -ne 0 ] && [[ ! ${result} =~ "Server on port 50052 is not up or has been shutdown already" ]]; then + if [ "${rc}" -ne 0 ] && [[ ! ${result} =~ "Server on port 50052 is not reachable or has been shutdown already" ]]; then echo "cache_admin command failure!" "${result}" exit 1 fi -} \ No newline at end of file +} diff --git a/model_zoo/official/cv/resnet/scripts/run_distribute_train.sh b/model_zoo/official/cv/resnet/scripts/run_distribute_train.sh index f2727ba0b8b..662a4274c94 100755 --- a/model_zoo/official/cv/resnet/scripts/run_distribute_train.sh +++ b/model_zoo/official/cv/resnet/scripts/run_distribute_train.sh @@ -134,6 +134,10 @@ do then python train.py --net=$1 --dataset=$2 --run_distribute=True --device_num=$DEVICE_NUM --dataset_path=$PATH2 \ --run_eval=$RUN_EVAL --eval_dataset_path=$EVAL_DATASET_PATH --enable_cache=True --cache_session_id=$CACHE_SESSION_ID &> log & + if [ "x${RUN_EVAL}" == "xTrue" ] + then + echo -e "\nWhen training run is done, remember to shut down the cache server via \"cache_admin --stop\"" + fi fi cd .. done diff --git a/model_zoo/official/cv/resnet/scripts/run_distribute_train_gpu.sh b/model_zoo/official/cv/resnet/scripts/run_distribute_train_gpu.sh index 930d002cff4..4d7217635bc 100755 --- a/model_zoo/official/cv/resnet/scripts/run_distribute_train_gpu.sh +++ b/model_zoo/official/cv/resnet/scripts/run_distribute_train_gpu.sh @@ -120,4 +120,8 @@ then python train.py --net=$1 --dataset=$2 --run_distribute=True \ --device_num=$DEVICE_NUM --device_target="GPU" --dataset_path=$PATH1 --run_eval=$RUN_EVAL \ --eval_dataset_path=$EVAL_DATASET_PATH --enable_cache=True --cache_session_id=$CACHE_SESSION_ID &> log & + if [ "x${RUN_EVAL}" == "xTrue" ] + then + echo -e "\nWhen training run is done, remember to shut down the cache server via \"cache_admin --stop\"" + fi fi diff --git a/model_zoo/official/cv/resnet/scripts/run_standalone_train.sh b/model_zoo/official/cv/resnet/scripts/run_standalone_train.sh index 85cdce4ee3f..44d3c7555c7 100755 --- a/model_zoo/official/cv/resnet/scripts/run_standalone_train.sh +++ b/model_zoo/official/cv/resnet/scripts/run_standalone_train.sh @@ -123,5 +123,9 @@ if [ $# == 5 ] then python train.py --net=$1 --dataset=$2 --dataset_path=$PATH1 --run_eval=$RUN_EVAL \ --eval_dataset_path=$EVAL_DATASET_PATH --enable_cache=True --cache_session_id=$CACHE_SESSION_ID &> log & + if [ "x${RUN_EVAL}" == "xTrue" ] + then + echo -e "\nWhen training run is done, remember to shut down the cache server via \"cache_admin --stop\"" + fi fi cd .. diff --git a/model_zoo/official/cv/resnet/scripts/run_standalone_train_gpu.sh b/model_zoo/official/cv/resnet/scripts/run_standalone_train_gpu.sh index f8effa70e14..19433be36cb 100755 --- a/model_zoo/official/cv/resnet/scripts/run_standalone_train_gpu.sh +++ b/model_zoo/official/cv/resnet/scripts/run_standalone_train_gpu.sh @@ -17,7 +17,7 @@ CURPATH="$(dirname "$0")" . ${CURPATH}/cache_util.sh -if [ $# != 3 ] && [ $# != 4 ] +if [ $# != 3 ] && [ $# != 4 ] && [ $# != 5 ] then echo "Usage: bash run_standalone_train_gpu.sh [resnet50|resnet101] [cifar10|imagenet2012] [DATASET_PATH] [PRETRAINED_CKPT_PATH](optional)" echo " bash run_standalone_train_gpu.sh [resnet50|resnet101] [cifar10|imagenet2012] [DATASET_PATH] [RUN_EVAL](optional) [EVAL_DATASET_PATH](optional)" @@ -120,5 +120,9 @@ if [ $# == 5 ] then python train.py --net=$1 --dataset=$2 --device_target="GPU" --dataset_path=$PATH1 --run_eval=$RUN_EVAL \ --eval_dataset_path=$EVAL_DATASET_PATH --enable_cache=True --cache_session_id=$CACHE_SESSION_ID &> log & + if [ "x${RUN_EVAL}" == "xTrue" ] + then + echo -e "\nWhen training run is done, remember to shut down the cache server via \"cache_admin --stop\"" + fi fi cd .. diff --git a/model_zoo/official/cv/resnet/train.py b/model_zoo/official/cv/resnet/train.py index 4298331aaf2..1c6fb2aa572 100755 --- a/model_zoo/official/cv/resnet/train.py +++ b/model_zoo/official/cv/resnet/train.py @@ -266,3 +266,6 @@ if __name__ == '__main__': dataset_sink_mode = (not args_opt.parameter_server) and target != "CPU" model.train(config.epoch_size - config.pretrain_epoch_size, dataset, callbacks=cb, sink_size=dataset.get_dataset_size(), dataset_sink_mode=dataset_sink_mode) + + if args_opt.run_eval and args_opt.enable_cache: + print("Remember to shut down the cache server via \"cache_admin --stop\"") diff --git a/tests/ut/python/cachetests/cachetest_args.sh b/tests/ut/python/cachetests/cachetest_args.sh index 68e5ef9f36e..6c69175a62e 100755 --- a/tests/ut/python/cachetests/cachetest_args.sh +++ b/tests/ut/python/cachetests/cachetest_args.sh @@ -55,8 +55,8 @@ cmd="${CACHE_ADMIN} --start --spilldir /path_that_does_not_exist" CacheAdminCmd "${cmd}" 1 HandleRcExit $? 0 0 -# stop cache server first to test start -StopServer +# clean up cache server first to test start +ServerCleanup # start cache server StartServer HandleRcExit $? 1 1 diff --git a/tests/ut/python/dataset/test_cache_map.py b/tests/ut/python/dataset/test_cache_map.py index 24db7dcb311..197051b5d71 100644 --- a/tests/ut/python/dataset/test_cache_map.py +++ b/tests/ut/python/dataset/test_cache_map.py @@ -56,7 +56,7 @@ def test_cache_map_basic1(): if "SESSION_ID" in os.environ: session_id = int(os.environ['SESSION_ID']) else: - session_id = 1 + raise RuntimeError("Testcase requires SESSION_ID environment variable") some_cache = ds.DatasetCache(session_id=session_id, size=0) @@ -1970,7 +1970,7 @@ def test_cache_map_mindrecord1(): if "SESSION_ID" in os.environ: session_id = int(os.environ['SESSION_ID']) else: - session_id = 1 + raise RuntimeError("Testcase requires SESSION_ID environment variable") some_cache = ds.DatasetCache(session_id=session_id, size=0) @@ -2006,7 +2006,7 @@ def test_cache_map_mindrecord2(): if "SESSION_ID" in os.environ: session_id = int(os.environ['SESSION_ID']) else: - session_id = 1 + raise RuntimeError("Testcase requires SESSION_ID environment variable") some_cache = ds.DatasetCache(session_id=session_id, size=0) @@ -2029,6 +2029,45 @@ def test_cache_map_mindrecord2(): logger.info("test_cache_map_mindrecord2 Ended.\n") +@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") +def test_cache_map_mindrecord3(): + """ + Test cache sharing between the following two pipelines with mindrecord leaf: + + Cache Cache + | | + Map(decode) Map(decode) + | | + MindRecord(num_parallel_workers=5) MindRecord(num_parallel_workers=6) + """ + + logger.info("Test cache map mindrecord3") + if "SESSION_ID" in os.environ: + session_id = int(os.environ['SESSION_ID']) + else: + raise RuntimeError("Testcase requires SESSION_ID environment variable") + + some_cache = ds.DatasetCache(session_id=session_id, size=0) + + # This dataset has 5 records + columns_list = ["id", "file_name", "label_name", "img_data", "label_data"] + decode_op = c_vision.Decode() + + ds1 = ds.MindDataset(MIND_RECORD_DATA_DIR, columns_list=columns_list, num_parallel_workers=5, shuffle=True) + ds1 = ds1.map(input_columns=["img_data"], operations=decode_op, cache=some_cache) + + ds2 = ds.MindDataset(MIND_RECORD_DATA_DIR, columns_list=columns_list, num_parallel_workers=6, shuffle=True) + ds2 = ds2.map(input_columns=["img_data"], operations=decode_op, cache=some_cache) + + iter1 = ds1.create_dict_iterator(num_epochs=1, output_numpy=True) + iter2 = ds2.create_dict_iterator(num_epochs=1, output_numpy=True) + + assert sum([1 for _ in iter1]) == 5 + assert sum([1 for _ in iter2]) == 5 + + logger.info("test_cache_map_mindrecord3 Ended.\n") + + @pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") def test_cache_map_python_sampler1(): """