Fix cache_admin server start stderr output

Fix TFRecord get_dataset_size() when cached
This commit is contained in:
Lixia Chen 2021-06-18 16:24:13 -04:00
parent 35c1f14cf3
commit 1f03b16e0e
7 changed files with 192 additions and 8 deletions

View File

@ -589,16 +589,14 @@ Status CacheAdminArgHandler::StartServer() {
(void)dup2(fd[0], STDIN_FILENO);
close(fd[0]);
std::string msg;
std::string buf;
const uint32_t buf_sz = 1024;
msg.resize(buf_sz);
auto n = read(0, msg.data(), buf_sz);
buf.resize(buf_sz);
auto n = read(0, buf.data(), buf_sz);
// keep reading until we drain the pipe
while (n > 0) {
msg.resize(n);
// Not an error, some info message goes to stdout
std::cout << msg;
msg.resize(buf_sz);
n = read(0, msg.data(), buf_sz);
msg += buf.substr(0, n);
n = read(0, buf.data(), buf_sz);
}
if (n < 0) {
std::string err_msg = "Failed to read from pipeline " + std::to_string(errno);
@ -613,6 +611,9 @@ Status CacheAdminArgHandler::StartServer() {
auto exit_status = WEXITSTATUS(status);
if (exit_status) {
return Status(StatusCode::kMDUnexpectedError, msg);
} else {
// Not an error, some info message goes to stdout
std::cout << msg;
}
}
return Status::OK();

View File

@ -176,5 +176,28 @@ Status MapNode::to_json(nlohmann::json *out_json) {
*out_json = args;
return Status::OK();
}
// Gets the dataset size
Status MapNode::GetDatasetSize(const std::shared_ptr<DatasetSizeGetter> &size_getter, bool estimate,
int64_t *dataset_size) {
if (dataset_size_ > 0) {
*dataset_size = dataset_size_;
return Status::OK();
}
// If cache is injected after a MapNode, it is possible that the pipeline needs to handle different numbers of rows
// compared to a non-cached pipeline. This is mostly true for TFRecord dataset, since it uses row-based sharding
// with cache but file-based sharding without cache. However, MapNode couldn't tell whether the leaf below is
// TFRecord or not, therefore it doesn't rely on its child here but simply run through the tree.
if (!IsSizeDefined() || IsCached()) {
RETURN_IF_NOT_OK(size_getter->DryRun(shared_from_this(), dataset_size));
dataset_size_ = *dataset_size;
return Status::OK();
}
if (children_.size() == 1) {
return children_.front()->GetDatasetSize(size_getter, estimate, dataset_size);
} else {
RETURN_STATUS_UNEXPECTED("Trying to get dataset size from leaf node, missing override");
}
}
} // namespace dataset
} // namespace mindspore

View File

@ -93,6 +93,15 @@ class MapNode : public DatasetNode {
/// \return Status of the function
Status to_json(nlohmann::json *out_json) override;
/// \brief Base-class override for GetDatasetSize
/// \param[in] size_getter Shared pointer to DatasetSizeGetter
/// \param[in] estimate This is only supported by some of the ops and it's used to speed up the process of getting
/// dataset size at the expense of accuracy.
/// \param[out] dataset_size the size of the dataset
/// \return Status of the function
Status GetDatasetSize(const std::shared_ptr<DatasetSizeGetter> &size_getter, bool estimate,
int64_t *dataset_size) override;
private:
std::vector<std::shared_ptr<TensorOperation>> operations_;
std::vector<std::string> input_columns_;

View File

@ -171,7 +171,8 @@ Status TFRecordNode::GetDatasetSize(const std::shared_ptr<DatasetSizeGetter> &si
}
int64_t num_rows;
constexpr int64_t kThreadCount = 8;
if (!shard_equal_rows_) {
// By default, TFRecord will do file-based sharding. But when cache is injected, it will be row-based sharding.
if (!shard_equal_rows_ && !IsCached()) {
// Data will be sharded by file
std::vector<std::string> shard_file_list;
RETURN_IF_NOT_OK(GetShardFileList(&shard_file_list));

View File

@ -130,6 +130,9 @@ HandleRcExit $? 0 0
PytestCmd "test_cache_map.py" "test_cache_map_nested_repeat"
HandleRcExit $? 0 0
PytestCmd "test_cache_map.py" "test_cache_map_dataset_size" 1
HandleRcExit $? 0 0
GetSession
HandleRcExit $? 1 1
export SESSION_ID=$session_id
@ -334,6 +337,9 @@ HandleRcExit $? 0 0
PytestCmd "test_cache_nomap.py" "test_cache_nomap_pyfunc" 1
HandleRcExit $? 0 0
PytestCmd "test_cache_nomap.py" "test_cache_nomap_dataset_size" 1
HandleRcExit $? 0 0
GetSession
HandleRcExit $? 1 1
export SESSION_ID=$session_id

View File

@ -2227,6 +2227,76 @@ def test_cache_map_interrupt_and_rerun():
logger.info("test_cache_map_interrupt_and_rerun Ended.\n")
@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
def test_cache_map_dataset_size1():
"""
Test get_dataset_size() when cache is injected directly after a mappable leaf
Cache
|
CelebA
"""
logger.info("Test cache map dataset size 1")
if "SESSION_ID" in os.environ:
session_id = int(os.environ['SESSION_ID'])
else:
raise RuntimeError("Testcase requires SESSION_ID environment variable")
some_cache = ds.DatasetCache(session_id=session_id, size=0)
# This dataset has 4 records
ds1 = ds.CelebADataset(CELEBA_DATA_DIR, num_shards=3, shard_id=0, cache=some_cache)
dataset_size = ds1.get_dataset_size()
assert dataset_size == 2
num_iter = 0
for _ in ds1.create_dict_iterator():
num_iter += 1
logger.info("Number of data in ds1: {} ".format(num_iter))
assert num_iter == dataset_size
logger.info("test_cache_map_dataset_size1 Ended.\n")
@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
def test_cache_map_dataset_size2():
"""
Test get_dataset_size() when cache is injected after map
Cache
|
Map(resize)
|
CelebA
"""
logger.info("Test cache map dataset size 2")
if "SESSION_ID" in os.environ:
session_id = int(os.environ['SESSION_ID'])
else:
raise RuntimeError("Testcase requires SESSION_ID environment variable")
some_cache = ds.DatasetCache(session_id=session_id, size=0)
# This dataset has 4 records
ds1 = ds.CelebADataset(CELEBA_DATA_DIR, shuffle=False, decode=True, num_shards=3, shard_id=0)
resize_op = c_vision.Resize((224, 224))
ds1 = ds1.map(operations=resize_op, input_columns=["image"], cache=some_cache)
dataset_size = ds1.get_dataset_size()
assert dataset_size == 2
num_iter = 0
for _ in ds1.create_dict_iterator():
num_iter += 1
logger.info("Number of data in ds1: {} ".format(num_iter))
assert num_iter == dataset_size
logger.info("test_cache_map_dataset_size2 Ended.\n")
if __name__ == '__main__':
# This is just a list of tests, don't try to run these tests with 'python test_cache_map.py'
# since cache server is required to be brought up first
@ -2285,3 +2355,5 @@ if __name__ == '__main__':
test_cache_map_python_sampler1()
test_cache_map_python_sampler2()
test_cache_map_nested_repeat()
test_cache_map_dataset_size1()
test_cache_map_dataset_size2()

View File

@ -2355,6 +2355,76 @@ def test_cache_nomap_all_rows_cached():
logger.info("test_cache_nomap_all_rows_cached Ended.\n")
@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
def test_cache_nomap_dataset_size1():
"""
Test get_dataset_size() when cache is injected directly after a non-mappable leaf
Cache
|
TFRecord
"""
logger.info("Test cache nomap dataset size 1")
if "SESSION_ID" in os.environ:
session_id = int(os.environ['SESSION_ID'])
else:
raise RuntimeError("Testcase requires SESSION_ID environment variable")
some_cache = ds.DatasetCache(session_id=session_id, size=0)
# This dataset has 3 records in it only
ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, num_shards=2, shard_id=0, cache=some_cache)
dataset_size = ds1.get_dataset_size()
assert dataset_size == 2
num_iter = 0
for _ in ds1.create_dict_iterator():
num_iter += 1
logger.info("Number of data in ds1: {} ".format(num_iter))
assert num_iter == dataset_size
logger.info("test_cache_nomap_dataset_size1 Ended.\n")
@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
def test_cache_nomap_dataset_size2():
"""
Test get_dataset_size() when cache is injected after map
Cache
|
Map(decode)
|
TFRecord
"""
logger.info("Test cache nomap dataset size 2")
if "SESSION_ID" in os.environ:
session_id = int(os.environ['SESSION_ID'])
else:
raise RuntimeError("Testcase requires SESSION_ID environment variable")
some_cache = ds.DatasetCache(session_id=session_id, size=0)
# This dataset has 3 records in it only
ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, num_shards=2, shard_id=0)
decode_op = c_vision.Decode()
ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache)
dataset_size = ds1.get_dataset_size()
assert dataset_size == 2
num_iter = 0
for _ in ds1.create_dict_iterator():
num_iter += 1
logger.info("Number of data in ds1: {} ".format(num_iter))
assert num_iter == dataset_size
logger.info("test_cache_nomap_dataset_size2 Ended.\n")
if __name__ == '__main__':
# This is just a list of tests, don't try to run these tests with 'python test_cache_nomap.py'
# since cache server is required to be brought up first
@ -2414,3 +2484,5 @@ if __name__ == '__main__':
test_cache_nomap_pyfunc_lambda()
test_cache_nomap_pyfunc_builtin()
test_cache_nomap_pyfunc_function()
test_cache_nomap_dataset_size1()
test_cache_nomap_dataset_size2()