change shared mem perm to 600 & minor other fixes

This commit is contained in:
Lixia Chen 2021-05-13 20:40:58 -04:00
parent 40aa51bbd3
commit 8812111f94
18 changed files with 104 additions and 28 deletions

View File

@ -529,7 +529,8 @@ Status CacheAdminArgHandler::StopServer(CommandId command_id) {
if (rc.IsError()) {
msg.RemoveResourcesOnExit();
if (rc == StatusCode::kMDNetWorkError) {
std::string errMsg = "Server on port " + std::to_string(port_) + " is not up or has been shutdown already.";
std::string errMsg =
"Server on port " + std::to_string(port_) + " is not reachable or has been shutdown already.";
return Status(StatusCode::kMDNetWorkError, errMsg);
}
return rc;

View File

@ -41,15 +41,15 @@ Status CacheClient::Builder::Build(std::shared_ptr<CacheClient> *out) {
}
Status CacheClient::Builder::SanityCheck() {
CHECK_FAIL_RETURN_SYNTAX_ERROR(session_id_ > 0, "session id must be positive");
CHECK_FAIL_RETURN_SYNTAX_ERROR(cache_mem_sz_ >= 0, "cache memory size must not be negative. (0 implies unlimited");
CHECK_FAIL_RETURN_SYNTAX_ERROR(num_connections_ > 0, "rpc connections must be positive");
CHECK_FAIL_RETURN_SYNTAX_ERROR(prefetch_size_ > 0, "prefetch size must be positive");
CHECK_FAIL_RETURN_SYNTAX_ERROR(!hostname_.empty(), "hostname must not be empty");
CHECK_FAIL_RETURN_SYNTAX_ERROR(port_ > 1024, "Port must be in range (1025..65535)");
CHECK_FAIL_RETURN_SYNTAX_ERROR(port_ <= 65535, "Port must be in range (1025..65535)");
CHECK_FAIL_RETURN_SYNTAX_ERROR(session_id_ > 0, "session id must be positive.");
CHECK_FAIL_RETURN_SYNTAX_ERROR(cache_mem_sz_ >= 0, "cache memory size must not be negative (0 implies unlimited).");
CHECK_FAIL_RETURN_SYNTAX_ERROR(num_connections_ > 0, "number of tcp/ip connections must be positive.");
CHECK_FAIL_RETURN_SYNTAX_ERROR(prefetch_size_ > 0, "prefetch size must be positive.");
CHECK_FAIL_RETURN_SYNTAX_ERROR(!hostname_.empty(), "hostname must not be empty.");
CHECK_FAIL_RETURN_SYNTAX_ERROR(port_ > 1024, "Port must be in range (1025..65535).");
CHECK_FAIL_RETURN_SYNTAX_ERROR(port_ <= 65535, "Port must be in range (1025..65535).");
CHECK_FAIL_RETURN_SYNTAX_ERROR(hostname_ == "127.0.0.1",
"now cache client has to be on the same host with cache server");
"now cache client has to be on the same host with cache server.");
return Status::OK();
}

View File

@ -73,7 +73,11 @@ inline void Status2CacheReply(const Status &rc, CacheReply *reply) {
/// \param port
/// \return unix socket url
inline std::string PortToUnixSocketPath(int port) {
return kDefaultCommonPath + std::string("/cache_server_p") + std::to_string(port);
#if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
return kDefaultCommonPath + Services::GetUserName() + std::string("/cache_server_p") + std::to_string(port);
#else
return kDefaultCommonPath;
#endif
}
/// \brief Round up to the next 4k

View File

@ -42,7 +42,7 @@ SharedMessage::~SharedMessage() {
Status SharedMessage::Create() {
CHECK_FAIL_RETURN_UNEXPECTED(msg_qid_ == -1, "Message queue already created");
auto access_mode = S_IRUSR | S_IWUSR | S_IROTH | S_IWOTH | S_IRGRP | S_IWGRP;
auto access_mode = S_IRUSR | S_IWUSR;
msg_qid_ = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | access_mode);
if (msg_qid_ == -1) {
std::string errMsg = "Unable to create a message queue. Errno = " + std::to_string(errno);
@ -99,7 +99,7 @@ SharedMemory::~SharedMemory() {
}
Status SharedMemory::Create(int64_t sz) {
auto access_mode = S_IRUSR | S_IWUSR | S_IROTH | S_IWOTH | S_IRGRP | S_IWGRP;
auto access_mode = S_IRUSR | S_IWUSR;
shm_id_ = shmget(shm_key_, sz, IPC_CREAT | IPC_EXCL | access_mode);
if (shm_id_ == -1) {
RETURN_STATUS_UNEXPECTED("Shared memory creation failed. Errno " + std::to_string(errno));

View File

@ -374,6 +374,7 @@ uint32_t DatasetOp::GenerateCRC(const std::shared_ptr<DatasetOp> &op) {
std::string ss_str = ss.str();
// Filter out the Num workers field when generating the check sum
ss_str = std::regex_replace(ss_str, std::regex("Number of ShardReader workers.*\n"), "");
ss_str = std::regex_replace(ss_str, std::regex("Num workers.*\n"), "");
ss_str = std::regex_replace(ss_str, std::regex("\\[workers.*?\\]"), "");
ss_str = std::regex_replace(ss_str, std::regex("Connector queue size.*\n"), "");

View File

@ -129,6 +129,11 @@ def check_uint32(value, arg_name=""):
check_value(value, [UINT32_MIN, UINT32_MAX])
def check_pos_uint32(value, arg_name=""):
type_check(value, (int,), arg_name)
check_value(value, [POS_INT_MIN, UINT32_MAX])
def check_pos_int32(value, arg_name=""):
type_check(value, (int,), arg_name)
check_value(value, [POS_INT_MIN, INT32_MAX], arg_name)

View File

@ -18,7 +18,8 @@
import copy
from mindspore._c_dataengine import CacheClient
from ..core.validator_helpers import type_check, check_uint32, check_uint64, check_positive, check_value
from ..core.validator_helpers import type_check, check_pos_int32, check_pos_uint32, check_uint64, check_positive, \
check_value
class DatasetCache:
@ -50,7 +51,7 @@ class DatasetCache:
def __init__(self, session_id, size=0, spilling=False, hostname=None, port=None, num_connections=None,
prefetch_size=None):
check_uint32(session_id, "session_id")
check_pos_uint32(session_id, "session_id")
type_check(size, (int,), "size")
if size != 0:
check_positive(size, "size")
@ -62,9 +63,9 @@ class DatasetCache:
type_check(port, (int,), "port")
check_value(port, (1025, 65535), "port")
if num_connections is not None:
check_uint32(num_connections, "num_connections")
check_pos_int32(num_connections, "num_connections")
if prefetch_size is not None:
check_uint32(prefetch_size, "prefetch_size")
check_pos_int32(prefetch_size, "prefetch_size")
self.session_id = session_id
self.size = size

View File

@ -1,4 +1,4 @@
#!/usr/bin/env bash
#!/bin/bash
# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -42,7 +42,7 @@ shutdown_cache_server()
result=$(cache_admin --stop 2>&1)
rc=$?
echo "${result}"
if [ "${rc}" -ne 0 ] && [[ ! ${result} =~ "Server on port 50052 is not up or has been shutdown already" ]]; then
if [ "${rc}" -ne 0 ] && [[ ! ${result} =~ "Server on port 50052 is not reachable or has been shutdown already" ]]; then
echo "cache_admin command failure!" "${result}"
exit 1
fi

View File

@ -92,6 +92,7 @@ run_ascend()
&> log$i.log &
cd ..
done
echo -e "\nWhen training run is done, remember to shut down the cache server via \"cache_admin --stop\""
}
run_gpu()
@ -149,6 +150,7 @@ run_gpu()
--enable_cache=True \
--cache_session_id=$CACHE_SESSION_ID \
&> ../train.log & # dataset train folder
echo -e "\nWhen training run is done, remember to shut down the cache server via \"cache_admin --stop\""
}
run_cpu()
@ -198,6 +200,7 @@ run_cpu()
--enable_cache=True \
--cache_session_id=$CACHE_SESSION_ID \
&> ../train.log & # dataset train folder
echo -e "\nWhen training run is done, remember to shut down the cache server via \"cache_admin --stop\""
}
if [ $1 = "Ascend" ] ; then

View File

@ -131,3 +131,6 @@ if __name__ == '__main__':
if (epoch + 1) % config.save_checkpoint_epochs == 0:
save_checkpoint(net, os.path.join(save_ckpt_path, f"mobilenetv2_{epoch+1}.ckpt"))
print("total cost {:5.4f} s".format(time.time() - start))
if args_opt.enable_cache:
print("Remember to shut down the cache server via \"cache_admin --stop\"")

View File

@ -1,4 +1,4 @@
#!/usr/bin/env bash
#!/bin/bash
# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -42,7 +42,7 @@ shutdown_cache_server()
result=$(cache_admin --stop 2>&1)
rc=$?
echo "${result}"
if [ "${rc}" -ne 0 ] && [[ ! ${result} =~ "Server on port 50052 is not up or has been shutdown already" ]]; then
if [ "${rc}" -ne 0 ] && [[ ! ${result} =~ "Server on port 50052 is not reachable or has been shutdown already" ]]; then
echo "cache_admin command failure!" "${result}"
exit 1
fi

View File

@ -134,6 +134,10 @@ do
then
python train.py --net=$1 --dataset=$2 --run_distribute=True --device_num=$DEVICE_NUM --dataset_path=$PATH2 \
--run_eval=$RUN_EVAL --eval_dataset_path=$EVAL_DATASET_PATH --enable_cache=True --cache_session_id=$CACHE_SESSION_ID &> log &
if [ "x${RUN_EVAL}" == "xTrue" ]
then
echo -e "\nWhen training run is done, remember to shut down the cache server via \"cache_admin --stop\""
fi
fi
cd ..
done

View File

@ -120,4 +120,8 @@ then
python train.py --net=$1 --dataset=$2 --run_distribute=True \
--device_num=$DEVICE_NUM --device_target="GPU" --dataset_path=$PATH1 --run_eval=$RUN_EVAL \
--eval_dataset_path=$EVAL_DATASET_PATH --enable_cache=True --cache_session_id=$CACHE_SESSION_ID &> log &
if [ "x${RUN_EVAL}" == "xTrue" ]
then
echo -e "\nWhen training run is done, remember to shut down the cache server via \"cache_admin --stop\""
fi
fi

View File

@ -123,5 +123,9 @@ if [ $# == 5 ]
then
python train.py --net=$1 --dataset=$2 --dataset_path=$PATH1 --run_eval=$RUN_EVAL \
--eval_dataset_path=$EVAL_DATASET_PATH --enable_cache=True --cache_session_id=$CACHE_SESSION_ID &> log &
if [ "x${RUN_EVAL}" == "xTrue" ]
then
echo -e "\nWhen training run is done, remember to shut down the cache server via \"cache_admin --stop\""
fi
fi
cd ..

View File

@ -17,7 +17,7 @@
CURPATH="$(dirname "$0")"
. ${CURPATH}/cache_util.sh
if [ $# != 3 ] && [ $# != 4 ]
if [ $# != 3 ] && [ $# != 4 ] && [ $# != 5 ]
then
echo "Usage: bash run_standalone_train_gpu.sh [resnet50|resnet101] [cifar10|imagenet2012] [DATASET_PATH] [PRETRAINED_CKPT_PATH](optional)"
echo " bash run_standalone_train_gpu.sh [resnet50|resnet101] [cifar10|imagenet2012] [DATASET_PATH] [RUN_EVAL](optional) [EVAL_DATASET_PATH](optional)"
@ -120,5 +120,9 @@ if [ $# == 5 ]
then
python train.py --net=$1 --dataset=$2 --device_target="GPU" --dataset_path=$PATH1 --run_eval=$RUN_EVAL \
--eval_dataset_path=$EVAL_DATASET_PATH --enable_cache=True --cache_session_id=$CACHE_SESSION_ID &> log &
if [ "x${RUN_EVAL}" == "xTrue" ]
then
echo -e "\nWhen training run is done, remember to shut down the cache server via \"cache_admin --stop\""
fi
fi
cd ..

View File

@ -266,3 +266,6 @@ if __name__ == '__main__':
dataset_sink_mode = (not args_opt.parameter_server) and target != "CPU"
model.train(config.epoch_size - config.pretrain_epoch_size, dataset, callbacks=cb,
sink_size=dataset.get_dataset_size(), dataset_sink_mode=dataset_sink_mode)
if args_opt.run_eval and args_opt.enable_cache:
print("Remember to shut down the cache server via \"cache_admin --stop\"")

View File

@ -55,8 +55,8 @@ cmd="${CACHE_ADMIN} --start --spilldir /path_that_does_not_exist"
CacheAdminCmd "${cmd}" 1
HandleRcExit $? 0 0
# stop cache server first to test start
StopServer
# clean up cache server first to test start
ServerCleanup
# start cache server
StartServer
HandleRcExit $? 1 1

View File

@ -56,7 +56,7 @@ def test_cache_map_basic1():
if "SESSION_ID" in os.environ:
session_id = int(os.environ['SESSION_ID'])
else:
session_id = 1
raise RuntimeError("Testcase requires SESSION_ID environment variable")
some_cache = ds.DatasetCache(session_id=session_id, size=0)
@ -1970,7 +1970,7 @@ def test_cache_map_mindrecord1():
if "SESSION_ID" in os.environ:
session_id = int(os.environ['SESSION_ID'])
else:
session_id = 1
raise RuntimeError("Testcase requires SESSION_ID environment variable")
some_cache = ds.DatasetCache(session_id=session_id, size=0)
@ -2006,7 +2006,7 @@ def test_cache_map_mindrecord2():
if "SESSION_ID" in os.environ:
session_id = int(os.environ['SESSION_ID'])
else:
session_id = 1
raise RuntimeError("Testcase requires SESSION_ID environment variable")
some_cache = ds.DatasetCache(session_id=session_id, size=0)
@ -2029,6 +2029,45 @@ def test_cache_map_mindrecord2():
logger.info("test_cache_map_mindrecord2 Ended.\n")
@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
def test_cache_map_mindrecord3():
"""
Test cache sharing between the following two pipelines with mindrecord leaf:
Cache Cache
| |
Map(decode) Map(decode)
| |
MindRecord(num_parallel_workers=5) MindRecord(num_parallel_workers=6)
"""
logger.info("Test cache map mindrecord3")
if "SESSION_ID" in os.environ:
session_id = int(os.environ['SESSION_ID'])
else:
raise RuntimeError("Testcase requires SESSION_ID environment variable")
some_cache = ds.DatasetCache(session_id=session_id, size=0)
# This dataset has 5 records
columns_list = ["id", "file_name", "label_name", "img_data", "label_data"]
decode_op = c_vision.Decode()
ds1 = ds.MindDataset(MIND_RECORD_DATA_DIR, columns_list=columns_list, num_parallel_workers=5, shuffle=True)
ds1 = ds1.map(input_columns=["img_data"], operations=decode_op, cache=some_cache)
ds2 = ds.MindDataset(MIND_RECORD_DATA_DIR, columns_list=columns_list, num_parallel_workers=6, shuffle=True)
ds2 = ds2.map(input_columns=["img_data"], operations=decode_op, cache=some_cache)
iter1 = ds1.create_dict_iterator(num_epochs=1, output_numpy=True)
iter2 = ds2.create_dict_iterator(num_epochs=1, output_numpy=True)
assert sum([1 for _ in iter1]) == 5
assert sum([1 for _ in iter2]) == 5
logger.info("test_cache_map_mindrecord3 Ended.\n")
@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
def test_cache_map_python_sampler1():
"""