From b4b73b16c907d5b99d977ffc7e3ea746afcba870 Mon Sep 17 00:00:00 2001 From: Lixia Chen Date: Thu, 10 Dec 2020 17:38:45 -0500 Subject: [PATCH] Detect interrupt and cleanup BatchWait and other minor change per the request of HQ testers --- .../dataset/engine/cache/cache_admin_arg.cc | 4 ++++ .../dataset/engine/cache/cache_client.cc | 8 +++++++- .../dataset/engine/cache/cache_main.cc | 2 +- .../dataset/engine/cache/cache_server.cc | 18 +++++++++--------- 4 files changed, 21 insertions(+), 11 deletions(-) diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc index 19ab9290539..a0c33daea0d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc @@ -298,6 +298,7 @@ Status CacheAdminArgHandler::Validate() { // Any unhandled arguments at this point is an error. if (!trailing_args_.empty()) { std::string err_msg = "Invalid arguments provided: " + trailing_args_; + err_msg += "\nPlease try `cache_admin --help` for more information"; return Status(StatusCode::kSyntaxError, err_msg); } @@ -540,8 +541,11 @@ void CacheAdminArgHandler::Help() { std::cerr << " [[-s | --spilldir] ] Default is " << DefaultSpillDir() << ".\n"; std::cerr << " [[-l | --loglevel] ] Default is 1 (warning level).\n"; std::cerr << " [--destroy_session | -d] \n"; + std::cerr << " [[-p | --port] ]\n"; std::cerr << " [--generate_session | -g]\n"; + std::cerr << " [[-p | --port] ]\n"; std::cerr << " [--list_sessions]\n"; + std::cerr << " [[-p | --port] ]\n"; std::cerr << " [--help]" << std::endl; // Do not expose these option to the user via help or documentation, but the options do exist to aid with // development and tuning. diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.cc index 9e8d9a02d83..d2f50bf254c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.cc @@ -323,6 +323,7 @@ Status CacheClient::BuildPhaseDone() const { Status CacheClient::PushRequest(std::shared_ptr rq) const { return comm_->HandleRequest(std::move(rq)); } void CacheClient::ServerRunningOutOfResources() { + MS_LOG(WARNING) << "Cache server runs out of memory or disk space to cache any more rows!\n"; bool expected = true; if (fetch_all_keys_.compare_exchange_strong(expected, false)) { Status rc; @@ -509,7 +510,12 @@ Status CacheClient::AsyncBufferStream::SyncFlush(bool blocking) { } else { // Some clients are late and aren't done yet. Let go of the lock. lock.Unlock(); - retry = true; + if (this_thread::is_interrupted()) { + retry = false; + flush_rc_ = Status(StatusCode::kInterrupted); + } else { + retry = true; + } writer_wp_.Set(); std::this_thread::yield(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc index 47c82de5913..7550d25799c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc @@ -99,7 +99,6 @@ ds::Status StartServer(int argc, char **argv) { std::cout << "\nRecommendation:\nSince the server is detached into its own daemon process, monitor the server " "logs (under " << ds::DefaultLogDir() << ") for any issues that may happen after startup\n"; - MS_LOG(INFO) << "Cache server has started successfully and is listening on port " << port << std::endl; signal(SIGCHLD, SIG_IGN); // ignore sig child signal. return ds::Status::OK(); } else { @@ -119,6 +118,7 @@ ds::Status StartServer(int argc, char **argv) { } // Dump the summary + MS_LOG(INFO) << "Cache server has started successfully and is listening on port " << port << std::endl; MS_LOG(WARNING) << "Logging services started with log level: " << argv[5]; MS_LOG(WARNING) << builder << std::endl; // Create the instance with some sanity checks built in diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc index f2efb956179..c5b0f06a9b8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc @@ -403,7 +403,7 @@ Status CacheServer::BatchFetch(const std::shared_ptr(fbb->GetBufferPointer()); const auto num_elements = p->rows()->size(); auto connection_id = p->connection_id(); - BatchWait batch_wait = BatchWait(num_elements); + auto batch_wait = std::make_unique(num_elements); int64_t data_offset = (num_elements + 1) * sizeof(int64_t); auto *offset_array = reinterpret_cast(out->GetMutablePointer()); offset_array[0] = data_offset; @@ -439,17 +439,17 @@ Status CacheServer::BatchFetch(const std::shared_ptrrq_.add_buf_data(fb2.GetBufferPointer(), fb2.GetSize()); - cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast(&batch_wait))); + cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast(batch_wait.get()))); RETURN_IF_NOT_OK(PushRequest(worker_id, cache_rq)); } else { // Nothing to fetch but we still need to post something back into the wait area. - RETURN_IF_NOT_OK(batch_wait.Set(Status::OK())); + RETURN_IF_NOT_OK(batch_wait->Set(Status::OK())); } } // Now wait for all of them to come back. - RETURN_IF_NOT_OK(batch_wait.Wait()); + RETURN_IF_NOT_OK(batch_wait->Wait()); // Return the result - return batch_wait.GetRc(); + return batch_wait->GetRc(); } Status CacheServer::BatchFetchRows(CacheRequest *rq, CacheReply *reply) { @@ -733,7 +733,7 @@ Status CacheServer::BatchCacheRows(CacheRequest *rq) { offset_addr = strtoll(rq->buf_data(1).data(), nullptr, 10); auto p = reinterpret_cast(reinterpret_cast(base) + offset_addr); num_elem = strtol(rq->buf_data(2).data(), nullptr, 10); - BatchWait batch_wait = BatchWait(num_elem); + auto batch_wait = std::make_unique(num_elem); // Get a set of free request and push into the queues. for (auto i = 0; i < num_elem; ++i) { auto start = reinterpret_cast(p); @@ -754,13 +754,13 @@ Status CacheServer::BatchCacheRows(CacheRequest *rq) { cache_rq->rq_.add_buf_data(cookie); cache_rq->rq_.add_buf_data(std::to_string(start - reinterpret_cast(base))); cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast(p - start))); - cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast(&batch_wait))); + cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast(batch_wait.get()))); RETURN_IF_NOT_OK(PushRequest(GetRandomWorker(), cache_rq)); } // Now wait for all of them to come back. - RETURN_IF_NOT_OK(batch_wait.Wait()); + RETURN_IF_NOT_OK(batch_wait->Wait()); // Return the result - return batch_wait.GetRc(); + return batch_wait->GetRc(); } catch (const std::exception &e) { RETURN_STATUS_UNEXPECTED(e.what()); }