From 32b82c2737bcdbfa3bf1216e227bc81f4656fea1 Mon Sep 17 00:00:00 2001 From: Lixia Chen Date: Tue, 8 Dec 2020 12:05:43 -0500 Subject: [PATCH] Fix a core dump in TreeConsumer::Terminate() and other minor cache changes --- .../dataset/engine/cache/cache_admin_arg.cc | 30 +++---- .../dataset/engine/cache/cache_main.cc | 4 +- .../dataset/engine/cache/cache_server.cc | 2 +- .../dataset/engine/consumers/tree_consumer.cc | 5 +- tests/ut/cpp/dataset/c_api_cache_test.cc | 83 ++++++++++++++++++- tests/ut/python/dataset/test_cache_map.py | 4 + 6 files changed, 104 insertions(+), 24 deletions(-) diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc index 58e41331b1f..19ab9290539 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc @@ -355,7 +355,7 @@ Status CacheAdminArgHandler::RunCommand() { // The server will send a message back and remove the queue and we will then wake up. But on the safe // side, we will also set up an alarm and kill this process if we hang on // the message queue. - alarm(30); + alarm(60); Status dummy_rc; (void)msg.ReceiveStatus(&dummy_rc); std::cout << "Cache server on port " << std::to_string(port_) << " has been stopped successfully." << std::endl; @@ -533,24 +533,16 @@ Status CacheAdminArgHandler::StartServer(CommandId command_id) { void CacheAdminArgHandler::Help() { std::cerr << "Syntax:\n"; - std::cerr << " cache_admin [--start | --stop]\n"; - std::cerr << " [ [-h | --hostname] ]\n"; - std::cerr << " Default is " << kCfgDefaultCacheHost << ".\n"; - std::cerr << " [ [-p | --port] ]\n"; - std::cerr << " Possible values are in range [1025..65535].\n"; - std::cerr << " Default is " << kCfgDefaultCachePort << ".\n"; - std::cerr << " [ [-g | --generate_session] ]\n"; - std::cerr << " [ [-d | --destroy_session] ]\n"; - std::cerr << " [ [-w | --workers] ]\n"; - std::cerr << " Possible values are in range [1...max(100, Number of CPU)].\n"; - std::cerr << " Default is " << kDefaultNumWorkers << ".\n"; - std::cerr << " [ [-s | --spilldir] ]\n"; - std::cerr << " Default is " << DefaultSpillDir() << ".\n"; - std::cerr << " [ [-l | --loglevel] ]\n"; - std::cerr << " Possible values are 0, 1, 2 and 3.\n"; - std::cerr << " Default is 1 (info level).\n"; - std::cerr << " [--list_sessions]\n"; - std::cerr << " [--help]" << std::endl; + std::cerr << "cache_admin [--start | --stop]\n"; + std::cerr << " [[-h | --hostname] ] Default is " << kCfgDefaultCacheHost << ".\n"; + std::cerr << " [[-p | --port] ] Default is " << kCfgDefaultCachePort << ".\n"; + std::cerr << " [[-w | --workers] ] Default is " << kDefaultNumWorkers << ".\n"; + std::cerr << " [[-s | --spilldir] ] Default is " << DefaultSpillDir() << ".\n"; + std::cerr << " [[-l | --loglevel] ] Default is 1 (warning level).\n"; + std::cerr << " [--destroy_session | -d] \n"; + std::cerr << " [--generate_session | -g]\n"; + std::cerr << " [--list_sessions]\n"; + std::cerr << " [--help]" << std::endl; // Do not expose these option to the user via help or documentation, but the options do exist to aid with // development and tuning. // [ [-m | --shared_memory_size] ] diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc index 93f92e6b972..47c82de5913 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc @@ -119,8 +119,8 @@ ds::Status StartServer(int argc, char **argv) { } // Dump the summary - MS_LOG(INFO) << "Logging services started with log level: " << argv[5]; - MS_LOG(INFO) << builder << std::endl; + MS_LOG(WARNING) << "Logging services started with log level: " << argv[5]; + MS_LOG(WARNING) << builder << std::endl; // Create the instance with some sanity checks built in rc = builder.Build(); if (rc.IsOk()) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc index 29bf177cad0..f2efb956179 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc @@ -635,7 +635,7 @@ Status CacheServer::GetCacheMissKeys(CacheRequest *rq, CacheReply *reply) { inline Status GenerateClientSessionID(session_id_type session_id, CacheReply *reply) { reply->set_result(std::to_string(session_id)); - MS_LOG(INFO) << "Server generated new session id " << session_id; + MS_LOG(WARNING) << "Server generated new session id " << session_id; return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc b/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc index 101f9d836e3..bdaec513f06 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc @@ -36,7 +36,10 @@ namespace dataset { TreeConsumer::TreeConsumer() { tree_adapter_ = std::make_unique(); } Status TreeConsumer::Init(std::shared_ptr d) { return tree_adapter_->Compile(std::move(d)); } -Status TreeConsumer::Terminate() { return tree_adapter_->AllTasks()->ServiceStop(); } +Status TreeConsumer::Terminate() { + CHECK_FAIL_RETURN_UNEXPECTED(tree_adapter_->AllTasks() != nullptr, " Execution tree has not been built"); + return tree_adapter_->AllTasks()->ServiceStop(); +} // IteratorConsumer Status IteratorConsumer::Init(std::shared_ptr d) { diff --git a/tests/ut/cpp/dataset/c_api_cache_test.cc b/tests/ut/cpp/dataset/c_api_cache_test.cc index a769c048f7f..f149d499ce1 100644 --- a/tests/ut/cpp/dataset/c_api_cache_test.cc +++ b/tests/ut/cpp/dataset/c_api_cache_test.cc @@ -15,6 +15,7 @@ */ #include "common/common.h" #include "minddata/dataset/include/datasets.h" +#include "minddata/dataset/include/vision.h" #include "minddata/dataset/engine/ir/datasetops/source/csv_node.h" @@ -51,6 +52,34 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCApiSamplerNull) { EXPECT_EQ(iter, nullptr); } +TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCApiNestedCache) { + session_id_type env_session; + Status s = GetSessionFromEnv(&env_session); + EXPECT_EQ(s, Status::OK()); + + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + EXPECT_NE(some_cache, nullptr); + + // Create an ImageFolder Dataset, this folder_path only has 2 images in it + std::string folder_path = datasets_root_path_ + "/testImageNetData/train/"; + std::shared_ptr ds = ImageFolder(folder_path, false, RandomSampler(), {}, {}, some_cache); + EXPECT_NE(ds, nullptr); + + // Create objects for the tensor ops + std::shared_ptr decode_op = vision::Decode(); + EXPECT_NE(decode_op, nullptr); + + // Create a Map operation on ds + ds = ds->Map({decode_op}, {}, {}, {"image"}, some_cache); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + // Now in the cache_error_pass would fail and we would end up with a nullptr iter. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_EQ(iter, nullptr); +} + TEST_F(MindDataTestCacheOp, DISABLED_TestCacheImageFolderCApi) { session_id_type env_session; Status s = GetSessionFromEnv(&env_session); @@ -736,7 +765,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheClueCApi) { iter->Stop(); } -TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShare) { +TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShare1) { session_id_type env_session; Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); @@ -788,6 +817,58 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShare) { iter2->Stop(); } +TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShare2) { + session_id_type env_session; + Status s = GetSessionFromEnv(&env_session); + EXPECT_EQ(s, Status::OK()); + + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + EXPECT_NE(some_cache, nullptr); + + // Create an ImageFolder Dataset, this folder_path only has 2 images in it + std::string folder_path = datasets_root_path_ + "/testImageNetData/train/"; + // The first pipeline is ImageFolder with RandomSampler, the second pipeline is ImageFolder with SequentialSampler + // Since sampler does not influence the data in the source, these two pipelines can share a common cache. + std::shared_ptr ds1 = ImageFolder(folder_path, true, RandomSampler(), {}, {}, some_cache); + EXPECT_NE(ds1, nullptr); + std::shared_ptr ds2 = ImageFolder(folder_path, true, SequentialSampler(), {}, {}, some_cache); + EXPECT_NE(ds2, nullptr); + + // Create and launch the Execution Tree for ds1 + std::shared_ptr iter1 = ds1->CreateIterator(); + EXPECT_NE(iter1, nullptr); + // Iterate the dataset and get each row + std::unordered_map> row; + iter1->GetNextRow(&row); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row["image"]; + iter1->GetNextRow(&row); + } + EXPECT_EQ(i, 2); + // Manually terminate the pipeline + iter1->Stop(); + + // Create and launch the Execution Tree for ds2 + std::shared_ptr iter2 = ds2->CreateIterator(); + EXPECT_NE(iter2, nullptr); + // Iterate the dataset and get each row + iter2->GetNextRow(&row); + + i = 0; + while (row.size() != 0) { + i++; + auto image = row["image"]; + iter2->GetNextRow(&row); + } + EXPECT_EQ(i, 2); + + // Manually terminate the pipeline + iter2->Stop(); +} + TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShareFailure1) { session_id_type env_session; Status s = GetSessionFromEnv(&env_session); diff --git a/tests/ut/python/dataset/test_cache_map.py b/tests/ut/python/dataset/test_cache_map.py index 36717b951a2..6754936fb5a 100644 --- a/tests/ut/python/dataset/test_cache_map.py +++ b/tests/ut/python/dataset/test_cache_map.py @@ -233,6 +233,10 @@ def test_cache_map_failure1(): ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache) ds1 = ds1.repeat(4) + with pytest.raises(RuntimeError) as e: + ds1.get_batch_size() + assert "Nested cache operations" in str(e.value) + with pytest.raises(RuntimeError) as e: num_iter = 0 for _ in ds1.create_dict_iterator(num_epochs=1):