!17161 FIX selected CI

From: @ziruiwu
Reviewed-by: @robingrosman,@john_tzanakakis
Signed-off-by: @robingrosman
This commit is contained in:
mindspore-ci-bot 2021-05-30 20:43:01 +08:00 committed by Gitee
commit 583857799f
21 changed files with 87 additions and 58 deletions

View File

@ -1281,7 +1281,8 @@ int32_t CacheServer::Builder::AdjustNumWorkers(int32_t num_workers) {
num_workers = std::max(num_numa_nodes, num_workers);
// But also it shouldn't be too many more than the hardware concurrency
int32_t num_cpus = hw_info_->GetCpuCount();
num_workers = std::min(2 * num_cpus, num_workers);
constexpr int32_t kThreadsPerCore = 2;
num_workers = std::min(kThreadsPerCore * num_cpus, num_workers);
// Round up num_workers to a multiple of numa nodes.
auto remainder = num_workers % num_numa_nodes;
if (remainder > 0) num_workers += (num_numa_nodes - remainder);

View File

@ -63,13 +63,13 @@ Status BucketBatchByLengthOp::Builder::SanityCheck() const {
return Status::OK();
}
Status BucketBatchByLengthOp::Builder::Build(std::shared_ptr<BucketBatchByLengthOp> *new_bucket_batch_by_length_op) {
Status BucketBatchByLengthOp::Builder::Build(std::shared_ptr<BucketBatchByLengthOp> *bucket_batch_by_length_op) {
RETURN_IF_NOT_OK(SanityCheck());
// insert 0 for the first bucket
(void)builder_bucket_boundaries_.insert(builder_bucket_boundaries_.begin(), 0);
*new_bucket_batch_by_length_op = std::make_shared<BucketBatchByLengthOp>(
*bucket_batch_by_length_op = std::make_shared<BucketBatchByLengthOp>(
builder_length_dependent_columns_, builder_bucket_boundaries_, builder_bucket_batch_sizes_,
builder_element_length_function_, builder_pad_info_, builder_pad_to_bucket_boundary_, builder_drop_remainder_,
builder_op_connector_size_);

View File

@ -41,7 +41,7 @@ BuildVocabOp::BuildVocabOp(std::shared_ptr<Vocab> vocab, std::vector<std::string
// init two queues for thread sync
distributor_queue_ = std::make_unique<Queue<TensorRow>>(num_workers * op_conn_size);
collector_queue_ =
std::make_unique<Queue<std::unique_ptr<std::unordered_map<std::string, int64_t>>>>(num_workers * op_conn_size);
std::make_unique<Queue<std::unique_ptr<std::unordered_map<std::string, int64_t>>>>((num_workers * op_conn_size));
}
Status BuildVocabOp::WorkerEntry(int32_t worker_id) {

View File

@ -35,8 +35,9 @@ ParallelOp::ParallelOp(int32_t num_workers, int32_t op_connector_size, std::shar
epoch_sync_flag_(false) {
// reduce excessive memory usage with high parallelism
// when num_workers > 4, reduce op_connector_size to have similar total size if there were only 4 workers
if (num_workers_ > 4) {
oc_queue_size_ = std::max(1, op_connector_size * 4 / num_workers_);
constexpr int32_t worker_limit = 4;
if (num_workers_ > worker_limit) {
oc_queue_size_ = std::max(1, op_connector_size * worker_limit / num_workers_);
}
}

View File

@ -193,11 +193,15 @@ bool CelebAOp::CheckDatasetTypeValid() {
return false;
}
// train:0, valid=1, test=2
if (usage_ == "train" && (type == 0)) {
constexpr int32_t train_type = 0;
constexpr int32_t valid_type = 1;
constexpr int32_t test_type = 2;
if (usage_ == "train" && (type == train_type)) {
return true;
} else if (usage_ == "valid" && (type == 1)) {
} else if (usage_ == "valid" && (type == valid_type)) {
return true;
} else if (usage_ == "test" && (type == 2)) {
} else if (usage_ == "test" && (type == test_type)) {
return true;
}

View File

@ -205,7 +205,9 @@ Status CifarOp::ReadCifar10BlockData() {
Status CifarOp::ReadCifar100BlockData() {
// CIFAR 100 has 2 bin files. train.bin (60K imgs) 153,700KB and test.bin (30,740KB) (10K imgs)
// each img has two labels. Each row then is 32 * 32 *5 + 2 = 3,074 Bytes
uint32_t num_cifar100_records = 0; // test:10000, train:50000
uint32_t num_cifar100_records = 0; // test:10000, train:50000
constexpr uint32_t num_cifar100_test_records = 10000;
constexpr uint32_t num_cifar100_train_records = 50000;
uint32_t block_size = (kCifarImageSize + 2) * kCifarBlockImageNum; // about 2M
std::vector<unsigned char> image_data(block_size * sizeof(unsigned char), 0);
for (auto &file : cifar_files_) {
@ -220,9 +222,9 @@ Status CifarOp::ReadCifar100BlockData() {
if (usage_ == "test" && file_name.find("test") == std::string::npos) continue;
if (file_name.find("test") != std::string::npos) {
num_cifar100_records = 10000;
num_cifar100_records = num_cifar100_test_records;
} else if (file_name.find("train") != std::string::npos) {
num_cifar100_records = 50000;
num_cifar100_records = num_cifar100_train_records;
} else {
RETURN_STATUS_UNEXPECTED("Invalid file, Cifar100 train/test file not found in: " + file_name);
}

View File

@ -165,10 +165,8 @@ Status ExecutionTree::Launch() {
std::string err_msg = "Invalid thread number.";
RETURN_STATUS_UNEXPECTED(err_msg);
}
if (thread_num > 8)
cv::setNumThreads(8);
else
cv::setNumThreads(thread_num);
constexpr int32_t max_cv_threads_cnt = 8;
cv::setNumThreads(thread_num > max_cv_threads_cnt ? max_cv_threads_cnt : thread_num);
#endif
// Tree must be built and prepared before it can be launched!

View File

@ -109,7 +109,7 @@ Status CSVNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_ops) {
std::make_shared<CsvOp::Record<float>>(CsvOp::FLOAT, std::dynamic_pointer_cast<CsvRecord<float>>(v)->value));
} else if (v->type == CsvType::STRING) {
column_default_list.push_back(std::make_shared<CsvOp::Record<std::string>>(
CsvOp::STRING, std::dynamic_pointer_cast<CsvRecord<std::string>>(v)->value));
CsvOp::STRING, (std::dynamic_pointer_cast<CsvRecord<std::string>>(v))->value));
}
}

View File

@ -81,7 +81,8 @@ void MindDataNode::Print(std::ostream &out) const { out << Name() + "(file:" + d
Status MindDataNode::ValidateParams() {
RETURN_IF_NOT_OK(DatasetNode::ValidateParams());
if (!search_for_pattern_ && dataset_files_.size() > 4096) {
constexpr size_t max_len = 4096;
if (!search_for_pattern_ && dataset_files_.size() > max_len) {
std::string err_msg =
"MindDataNode: length of dataset_file must be less than or equal to 4096, dataset_file length: " +
std::to_string(dataset_file_.size());

View File

@ -40,8 +40,9 @@ Status TensorOpFusionPass::Visit(std::shared_ptr<MapNode> node, bool *const modi
[](auto op, const std::string &nm) { return op->Name() == nm; });
if (itr != ops.end()) {
MS_LOG(WARNING) << "Fusing pre-build Decode and RandomCropResize into one pre-build.";
auto op = dynamic_cast<RandomCropAndResizeOp *>((*(itr + 1))->Build().get());
(*itr) = std::make_shared<transforms::PreBuiltOperation>(std::make_shared<RandomCropDecodeResizeOp>(*op));
auto fused_op = dynamic_cast<RandomCropAndResizeOp *>((*(itr + 1))->Build().get());
RETURN_UNEXPECTED_IF_NULL(fused_op);
(*itr) = std::make_shared<transforms::PreBuiltOperation>(std::make_shared<RandomCropDecodeResizeOp>(*fused_op));
ops.erase(itr + 1);
node->setOperations(ops);
*modified = true;
@ -55,10 +56,10 @@ Status TensorOpFusionPass::Visit(std::shared_ptr<MapNode> node, bool *const modi
// return here if no pattern is found
RETURN_OK_IF_TRUE(itr == ops.end());
auto *op = dynamic_cast<vision::RandomResizedCropOperation *>((itr + 1)->get());
RETURN_UNEXPECTED_IF_NULL(op);
auto *fused_ir = dynamic_cast<vision::RandomResizedCropOperation *>((itr + 1)->get());
RETURN_UNEXPECTED_IF_NULL(fused_ir);
// fuse the two ops
(*itr) = std::make_shared<vision::RandomCropDecodeResizeOperation>(*op);
(*itr) = std::make_shared<vision::RandomCropDecodeResizeOperation>(*fused_ir);
ops.erase(itr + 1);
node->setOperations(ops);
*modified = true;

View File

@ -47,7 +47,8 @@ Status AutoWorkerPass::RunOnTree(std::shared_ptr<DatasetNode> root_ir, bool *con
float max_weight = 0;
for (const auto &p : pass.weight_profile_) max_weight = std::max(max_weight, p.second);
RETURN_IF_NOT_OK(pass.Run(root_ir, modified));
if (pass.parallel_ops_.size() > 3) {
constexpr size_t max_num_ops = 3;
if (pass.parallel_ops_.size() > max_num_ops) {
MS_LOG(WARNING) << "AutoNumWorker right now is only suitable for simple dataset pipelines that has at most, 1 leaf "
<< "1 batch and 1 map. AutoNumWorker may not be optimal for usage on complex pipelines.";
}

View File

@ -360,15 +360,16 @@ Status OperatorCpu::Analyze(std::string *name, double *utilization, std::string
// Only analyze the middle half of the samples
// Starting and ending may be impacted by startup or ending pipeline activities
int start_analyze = total_samples / 4;
int end_analyze = total_samples - start_analyze;
constexpr int64_t sample_sections = 4;
int64 start_analyze = total_samples / sample_sections;
int64 end_analyze = total_samples - start_analyze;
double op_util = 0;
*utilization = 0;
// start loop from 0 was as don't want to analyze op -1
for (auto op_id = 0; op_id < id_count_; op_id++) {
int sum = 0;
int index = op_id + 1;
int64 sum = 0;
int64 index = op_id + 1;
for (int i = start_analyze; i < end_analyze; i++) {
sum += cpu_op_util_[i][index].user_utilization_;
sum += cpu_op_util_[i][index].sys_utilization_;
@ -517,11 +518,12 @@ Status ProcessCpu::Analyze(std::string *name, double *utilization, std::string *
name->clear();
name->append("process_info");
int total_samples = process_util_.size();
int sum = 0;
int64 sum = 0;
// Only analyze the middle half of the samples
// Starting and ending may be impacted by startup or ending pipeline activities
int start_analyze = total_samples / 4;
int end_analyze = total_samples - start_analyze;
constexpr int64_t sample_sections = 4;
int64 start_analyze = total_samples / sample_sections;
int64 end_analyze = total_samples - start_analyze;
for (int i = start_analyze; i < end_analyze; i++) {
sum += process_util_[i].user_utilization_;
@ -614,7 +616,8 @@ Status CpuSampling::SaveSamplingItervalToFile() {
Status CpuSampling::Analyze() {
std::string name;
double utilization = 0;
constexpr double total_cpu_thold = 90;
constexpr double op_cpu_thold = 80;
// Keep track of specific information returned by differentn CPU sampling types
double total_utilization = 0;
double max_op_utilization = 0;
@ -633,7 +636,7 @@ Status CpuSampling::Analyze() {
detailed_op_cpu_message = extra_message;
}
}
if ((total_utilization < 90) && (max_op_utilization > 80)) {
if ((total_utilization < total_cpu_thold) && (max_op_utilization > op_cpu_thold)) {
MS_LOG(WARNING) << "Operator " << max_op_name << " is using " << max_op_utilization << "% CPU per thread. "
<< "This operator may benefit from increasing num_parallel_workers."
<< "Full Operator CPU utiliization for all operators: " << detailed_op_cpu_message << std::endl;

View File

@ -43,7 +43,8 @@ Status CenterCropOp::Compute(const std::shared_ptr<Tensor> &input, std::shared_p
int32_t left = crop_wid_ - input->shape()[1];
std::shared_ptr<Tensor> pad_image;
CHECK_FAIL_RETURN_UNEXPECTED((top < input->shape()[0] * 3 && left < input->shape()[1] * 3),
constexpr int64_t pad_limit = 3;
CHECK_FAIL_RETURN_UNEXPECTED((top < input->shape()[0] * pad_limit && left < input->shape()[1] * pad_limit),
"CenterCrop: CenterCropOp padding size is more than 3 times the original size.");
if (top > 0 && left > 0) { // padding only

View File

@ -60,7 +60,8 @@ Status ValidateVectorColorAttribute(const std::string &op_name, const std::strin
for (auto &attr_val : attr) {
RETURN_IF_NOT_OK(ValidateScalar(op_name, attr_name, attr_val, range, false, false));
}
if (attr.size() == 2 && (attr[0] > attr[1])) {
constexpr size_t attr_size_two = 2;
if (attr.size() == attr_size_two && (attr[0] > attr[1])) {
std::string err_msg = op_name + ":" + attr_name +
" lower bound must be less or equal to upper bound, got lb: " + std::to_string(attr[0]) +
", ub: " + std::to_string(attr[1]);

View File

@ -44,7 +44,7 @@ Status NgramOp::Compute(const std::shared_ptr<Tensor> &input, std::shared_ptr<Te
offsets.reserve(1 + l_len_ + r_len_ + input->shape().NumOfElements());
str_buffer.reserve(l_pad_with_sp_.size() * l_len_ + r_pad_with_sp_.size() * r_len_ + input->SizeInBytes());
offsets.push_back(str_buffer.size()); // insert 0 as the starting pos
for (int i = 0; i < l_len_; i++) offsets.push_back((str_buffer += l_pad_with_sp_).size());
for (int l_i = 0; l_i < l_len_; l_i++) offsets.push_back((str_buffer += l_pad_with_sp_).size());
for (auto itr = input->begin<std::string_view>(); itr != input->end<std::string_view>(); itr++) {
str_buffer += (*itr);
@ -52,7 +52,7 @@ Status NgramOp::Compute(const std::shared_ptr<Tensor> &input, std::shared_ptr<Te
offsets.push_back(str_buffer.size());
}
for (int i = 0; i < r_len_; i++) offsets.push_back((str_buffer += r_pad_with_sp_).size());
for (int r_i = 0; r_i < r_len_; r_i++) offsets.push_back((str_buffer += r_pad_with_sp_).size());
for (auto n : ngrams_) {
CHECK_FAIL_RETURN_UNEXPECTED(n > 0, "Ngram: ngrams needs to be a positive number.\n");
@ -63,8 +63,8 @@ Status NgramOp::Compute(const std::shared_ptr<Tensor> &input, std::shared_ptr<Te
} else {
CHECK_FAIL_RETURN_UNEXPECTED(end_ind - n >= 0, "Ngram: get offsets failed.");
for (int i = start_ind; i < end_ind - n; i++) {
res.emplace_back(str_buffer.substr(offsets[i], offsets[i + n] - offsets[i] - separator_.size()));
for (int ind = start_ind; ind < end_ind - n; ind++) {
res.emplace_back(str_buffer.substr(offsets[ind], offsets[ind + n] - offsets[ind] - separator_.size()));
}
}
}

View File

@ -53,12 +53,12 @@ from .iterators import DictIterator, TupleIterator, DummyIterator, check_iterato
ITERATORS_LIST, _unset_iterator_cleanup
from .queue import _SharedQueue
from .validators import check_batch, check_shuffle, check_map, check_filter, check_repeat, check_skip, check_zip, \
check_rename, check_numpyslicesdataset, check_device_send, \
check_take, check_project, check_imagefolderdataset, check_mnist_cifar_dataset, check_manifestdataset, \
check_tfrecorddataset, check_vocdataset, check_cocodataset, check_celebadataset, check_minddataset, \
check_generatordataset, check_sync_wait, check_zip_dataset, check_add_column, check_textfiledataset, check_concat, \
check_random_dataset, check_split, check_bucket_batch_by_length, check_cluedataset, check_save, check_csvdataset, \
check_paddeddataset, check_tuple_iterator, check_dict_iterator, check_schema, check_to_device_send
check_rename, check_numpyslicesdataset, check_device_send, check_take, check_project, check_imagefolderdataset, \
check_mnist_cifar_dataset, check_manifestdataset, check_tfrecorddataset, check_vocdataset, check_cocodataset, \
check_celebadataset, check_minddataset, check_generatordataset, check_sync_wait, check_zip_dataset, \
check_add_column, check_textfiledataset, check_concat, check_random_dataset, check_split, \
check_bucket_batch_by_length, check_cluedataset, check_save, check_csvdataset, check_paddeddataset, \
check_tuple_iterator, check_dict_iterator, check_schema, check_to_device_send
from ..core.config import get_callback_timeout, _init_device_info, get_enable_shared_mem, get_num_parallel_workers, \
get_prefetch_size, get_dynamic_columns
from ..core.datatypes import mstype_to_detype, mstypelist_to_detypelist
@ -88,7 +88,7 @@ def shuffle_to_shuffle_mode(shuffle):
if shuffle is None or shuffle:
shuffle_mode = cde.ShuffleMode.GLOBAL # Global shuffle
else:
shuffle_mode = cde.ShuffleMode.FALSE # No shuffle
shuffle_mode = cde.ShuffleMode.FALSE # No shuffle
else:
shuffle_mode = ShuffleToShuffleMode[shuffle]
return shuffle_mode
@ -156,6 +156,7 @@ def _get_operator_process():
fetched_all = fetched_all and item_full
return op_process, fetched_all
def _set_dataset_permissions(file_name, num_files):
"""
set saved dataset files' permissions to 600
@ -174,6 +175,7 @@ def _set_dataset_permissions(file_name, num_files):
if os.path.exists(index_file):
os.chmod(index_file, stat.S_IRUSR | stat.S_IWUSR)
class Dataset:
"""
Abstract class to represent a dataset in DataEngine's data pipeline.
@ -1593,7 +1595,7 @@ class Dataset:
for col in data.keys():
if col in dynamic_columns:
shape_mismatch = "dynamic column [" + col + "] with shape " + str(dynamic_columns[col]) + \
" does not match dataset column [" + col + "] with shape " + str(list(data[col].shape))
" does not match dataset column [" + col + "] with shape " + str(list(data[col].shape))
if data[col].ndim != len(dynamic_columns[col]):
raise RuntimeError(shape_mismatch)
for dim in range(len(dynamic_columns[col])):
@ -1850,6 +1852,7 @@ class MappableDataset(SourceDataset):
self.sampler = samplers.select_sampler(num_samples, sampler, shuffle, num_shards, shard_id)
def add_sampler(self, new_sampler):
""" add a sampler """
# note: By adding a sampler, the sampled IDs will flow to new_sampler
# after first passing through the current samplers attached to this dataset.
self.dataset_size = None
@ -2365,7 +2368,6 @@ def _pyfunc_worker_init(pyfunc_list, args_queue, ret_queue):
_RET_QUEUE = ret_queue
# Pyfunc worker execution function
# All exceptions will be raised to main processes
def _pyfunc_worker_exec(index, qid, *args):
@ -2388,6 +2390,7 @@ def _pyfunc_worker_exec(index, qid, *args):
## not using shared memory for passing arguments, call function directly
return _GLOBAL_PYFUNC_LIST[index](*args)
# PythonCallable wrapper for multiprocess pyfunc
class _PythonCallable:
"""
@ -3416,12 +3419,11 @@ class SamplerFn:
self.eof = threading.Event()
# Create workers
#get default queue size and adjust queuesize per worker if there are large # workers
# get default queue size and adjust queuesize per worker if there are large # workers
queue_size = get_prefetch_size()
queue_size = min(queue_size, queue_size * 4 // num_worker)
queue_size = max(2, queue_size)
for _ in range(num_worker):
if multi_process is True:
worker = _GeneratorWorkerMp(dataset, self.eof, max_rowsize, queue_size)

View File

@ -121,6 +121,7 @@ class BuiltinSampler:
self.child_sampler = sampler
def get_child(self):
""" add a child sampler """
return self.child_sampler
def parse_child(self):
@ -136,9 +137,11 @@ class BuiltinSampler:
return c_child_sampler
def is_shuffled(self):
""" not implemented """
raise NotImplementedError("Sampler must implement is_shuffled.")
def is_sharded(self):
""" not implemented """
raise NotImplementedError("Sampler must implement is_sharded.")
def get_num_samples(self):

View File

@ -392,6 +392,7 @@ def construct_tensor_ops(operations):
def to_policy(op_list):
""" op_list to policy """
policy_tensor_ops = []
for policy_list in op_list:
sub_policy_tensor_ops = []
@ -403,12 +404,17 @@ def to_policy(op_list):
def to_shuffle_mode(shuffle):
if shuffle == 2: return "global"
if shuffle == 1: return "files"
return False
""" int to shuffle mode """
ret_val = False
if shuffle == 2:
ret_val = "global"
elif shuffle == 1:
ret_val = "files"
return ret_val
def to_interpolation_mode(inter):
""" int to interpolation mode """
return {
0: Inter.LINEAR,
1: Inter.NEAREST,
@ -418,6 +424,7 @@ def to_interpolation_mode(inter):
def to_border_mode(border):
""" int to border mode """
return {
0: Border.CONSTANT,
1: Border.EDGE,
@ -427,6 +434,7 @@ def to_border_mode(border):
def to_mstype(data_type):
""" str to mstype """
return {
"bool": mstype.bool_,
"int8": mstype.int8,
@ -445,6 +453,7 @@ def to_mstype(data_type):
def to_image_batch_format(image_batch_format):
""" int to image batch format """
return {
0: ImageBatchFormat.NHWC,
1: ImageBatchFormat.NCHW
@ -452,4 +461,5 @@ def to_image_batch_format(image_batch_format):
def check_and_replace_input(input_value, expect, replace):
""" check and replace input arg """
return replace if input_value == expect else input_value

View File

@ -465,7 +465,8 @@ def check_pad_info(key, val):
for dim in val[0]:
if dim is not None:
type_check(dim, (int,), "dim in pad_shape")
assert dim > 0, "pad shape should be positive integers"
if dim <= 0:
raise ValueError("pad shape should be positive integers")
if val[1] is not None:
type_check(val[1], (int, float, str, bytes), "pad_value")

View File

@ -18,12 +18,11 @@ NLP text processing module which is developed with ICU4C and cppjieba.
utils provides some general methods for NLP text processing.
"""
import platform
from .transforms import Lookup, JiebaTokenizer, UnicodeCharTokenizer, Ngram, WordpieceTokenizer, TruncateSequencePair, \
ToNumber, SlidingWindow, SentencePieceTokenizer, PythonTokenizer
from .transforms import Lookup, JiebaTokenizer, UnicodeCharTokenizer, Ngram, WordpieceTokenizer, \
TruncateSequencePair, ToNumber, SlidingWindow, SentencePieceTokenizer, PythonTokenizer
from .utils import to_str, to_bytes, JiebaMode, Vocab, NormalizeForm, SentencePieceVocab, SentencePieceModel, \
SPieceTokenizerOutType, SPieceTokenizerLoadType
__all__ = [
"Lookup", "JiebaTokenizer", "UnicodeCharTokenizer", "Ngram",
"to_str", "to_bytes", "Vocab", "WordpieceTokenizer", "TruncateSequencePair", "ToNumber",

View File

@ -79,7 +79,7 @@ class Vocab(cde.Vocab):
word_list(list): a list of string where each element is a word of type string.
special_tokens(list, optional): a list of strings, each one is a special token. for example
special_tokens=["<pad>","<unk>"] (default=None, no special tokens will be added).
special_first(bool, optional): whether special_tokens will be prepended/appended to vocab, If special_tokens
special_first(bool, optional): whether special_tokens is prepended or appended to vocab. If special_tokens
is specified and special_first is set to True, special_tokens will be prepended (default=True).
Returns: