From 1b19118adc79b178110c8fbe947a3b148ec1da30 Mon Sep 17 00:00:00 2001 From: Lixia Chen Date: Mon, 30 Nov 2020 12:49:40 -0500 Subject: [PATCH] fix error msg issues from test team and move hostname/port to CacheClientGreeter --- .../dataset/engine/cache/cache_admin_arg.cc | 5 ++--- .../dataset/engine/cache/cache_client.cc | 9 +++------ .../dataset/engine/cache/cache_client.h | 4 ---- .../dataset/engine/cache/cache_grpc_client.cc | 10 +++++----- .../dataset/engine/cache/cache_grpc_client.h | 3 ++- .../dataset/engine/cache/cache_request.cc | 2 +- .../dataset/engine/cache/cache_server.cc | 1 + .../engine/cache/stub/cache_grpc_client.h | 2 +- .../engine/opt/pre/cache_error_pass.cc | 19 ++++++++++++------- 9 files changed, 27 insertions(+), 28 deletions(-) diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc index 6fa579df2cf..58e41331b1f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc @@ -352,7 +352,7 @@ Status CacheAdminArgHandler::RunCommand() { // have to wait for its complete shutdown because the server will shutdown // the comm layer as soon as the request is received, and we need to wait // on the message queue instead. - // The server will remove the queue and we will then wake up. But on the safe + // The server will send a message back and remove the queue and we will then wake up. But on the safe // side, we will also set up an alarm and kill this process if we hang on // the message queue. alarm(30); @@ -487,8 +487,7 @@ Status CacheAdminArgHandler::StartServer(CommandId command_id) { if (WIFEXITED(status)) { auto exit_status = WEXITSTATUS(status); if (exit_status) { - std::string errMsg = msg + "\nChild exit status " + std::to_string(exit_status); - return Status(StatusCode::kUnexpectedError, errMsg); + return Status(StatusCode::kUnexpectedError, msg); } else { // Not an error, some info message goes to stdout std::cout << msg << std::endl; diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.cc index 158db2bf431..65ebf45dca8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.cc @@ -61,13 +61,11 @@ CacheClient::CacheClient(session_id_type session_id, uint64_t cache_mem_sz, bool spill_(spill), client_id_(-1), local_bypass_(false), - hostname_(std::move(hostname)), - port_(port), num_connections_(num_connections), prefetch_size_(prefetch_size), fetch_all_keys_(true) { cinfo_.set_session_id(session_id); - comm_ = std::make_shared(hostname_, port_, num_connections_); + comm_ = std::make_shared(hostname, port, num_connections_); } CacheClient::~CacheClient() { @@ -99,8 +97,7 @@ CacheClient::~CacheClient() { void CacheClient::Print(std::ostream &out) const { out << " Session id: " << session_id() << "\n Cache crc: " << cinfo_.crc() << "\n Server cache id: " << server_connection_id_ << "\n Cache mem size: " << GetCacheMemSz() - << "\n Spilling: " << std::boolalpha << isSpill() << "\n Hostname: " << GetHostname() - << "\n Port: " << GetPort() << "\n Number of rpc workers: " << GetNumConnections() + << "\n Spilling: " << std::boolalpha << isSpill() << "\n Number of rpc workers: " << GetNumConnections() << "\n Prefetch size: " << GetPrefetchSize() << "\n Local client support: " << std::boolalpha << SupportLocalClient(); } @@ -251,7 +248,7 @@ Status CacheClient::CreateCache(uint32_t tree_crc, bool generate_id) { } if (success) { // Attach to shared memory for local client - RETURN_IF_NOT_OK(comm_->AttachToSharedMemory(port_, &local_bypass_)); + RETURN_IF_NOT_OK(comm_->AttachToSharedMemory(&local_bypass_)); if (local_bypass_) { async_buffer_stream_ = std::make_shared(); RETURN_IF_NOT_OK(async_buffer_stream_->Init(this)); diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.h b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.h index c0d9d50ce98..8698fc4763c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.h +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.h @@ -234,8 +234,6 @@ class CacheClient { session_id_type session_id() const { return cinfo_.session_id(); } uint64_t GetCacheMemSz() const { return cache_mem_sz_; } bool isSpill() const { return spill_; } - const std::string &GetHostname() const { return hostname_; } - int32_t GetPort() const { return port_; } int32_t GetNumConnections() const { return num_connections_; } int32_t GetPrefetchSize() const { return prefetch_size_; } int32_t GetClientId() const { return client_id_; } @@ -288,8 +286,6 @@ class CacheClient { std::vector cpu_list_; // Comm layer bool local_bypass_; - std::string hostname_; - int32_t port_; int32_t num_connections_; int32_t prefetch_size_; mutable std::shared_ptr comm_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_client.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_client.cc index 520e62bcd7d..27e73b294d7 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_client.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_client.cc @@ -20,12 +20,12 @@ namespace dataset { CacheClientGreeter::~CacheClientGreeter() { (void)ServiceStop(); } CacheClientGreeter::CacheClientGreeter(const std::string &hostname, int32_t port, int32_t num_connections) - : num_connections_(num_connections), request_cnt_(0), port_(port) { + : num_connections_(num_connections), request_cnt_(0), hostname_(std::move(hostname)), port_(port) { grpc::ChannelArguments args; // We need to bump up the message size to unlimited. The default receiving // message limit is 4MB which is not big enough. args.SetMaxReceiveMessageSize(-1); - MS_LOG(INFO) << "Hostname: " << hostname << "."; + MS_LOG(INFO) << "Hostname: " << hostname_ << ", port: " << std::to_string(port_); #if CACHE_LOCAL_CLIENT // Try connect locally to the unix_socket first as the first preference // Need to resolve hostname to ip address rather than to do a string compare @@ -42,11 +42,11 @@ CacheClientGreeter::CacheClientGreeter(const std::string &hostname, int32_t port stub_ = CacheServerGreeter::NewStub(channel_); } -Status CacheClientGreeter::AttachToSharedMemory(int32_t port, bool *local_bypass) { +Status CacheClientGreeter::AttachToSharedMemory(bool *local_bypass) { *local_bypass = false; #if CACHE_LOCAL_CLIENT SharedMemory::shm_key_t shm_key; - RETURN_IF_NOT_OK(PortToFtok(port, &shm_key)); + RETURN_IF_NOT_OK(PortToFtok(port_, &shm_key)); // Attach to the shared memory mem_.SetPublicKey(shm_key); RETURN_IF_NOT_OK(mem_.Attach()); @@ -120,7 +120,7 @@ Status CacheClientGreeter::WorkerEntry() { std::string err_msg; if (error_code == grpc::StatusCode::UNAVAILABLE) { err_msg = "Cache server with port " + std::to_string(port_) + - " is unreachable. Make sure the server is running. GRPC Code" + std::to_string(error_code); + " is unreachable. Make sure the server is running. GRPC Code " + std::to_string(error_code); } else { err_msg = rq->rc_.error_message() + ". GRPC Code " + std::to_string(error_code); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_client.h b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_client.h index b250788bc11..0d4d54e6faa 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_client.h +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_client.h @@ -80,7 +80,7 @@ class CacheClientGreeter : public Service { /// \brief Attach to shared memory for local client /// \note Called after we have established a connection. /// \return Status object. - Status AttachToSharedMemory(int32_t port, bool *local_bypass); + Status AttachToSharedMemory(bool *local_bypass); /// \brief This returns where we attach to the shared memory. /// \return Base address of the shared memory. @@ -97,6 +97,7 @@ class CacheClientGreeter : public Service { std::map> req_; SharedMemory mem_; int32_t port_; + std::string hostname_; }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_request.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_request.cc index 9fb7b0623f9..a4bdbb7b4c8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_request.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_request.cc @@ -255,7 +255,7 @@ Status CreateCacheRequest::PostReply() { if (err == -1) { RETURN_STATUS_UNEXPECTED("Unable to set affinity. Errno = " + std::to_string(errno)); } - MS_LOG(WARNING) << "Changing cpu affinity to the following list of cpu id: " + c_list; + MS_LOG(INFO) << "Changing cpu affinity to the following list of cpu id: " + c_list; } #endif diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc index 3515d495de7..c4c57132b69 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc @@ -167,6 +167,7 @@ Status CacheServer::DoServiceStop() { // Finally wake up cache_admin if it is waiting for (int32_t qID : shutdown_qIDs_) { SharedMessage msg(qID); + msg.SendStatus(Status::OK()); msg.RemoveResourcesOnExit(); // Let msg goes out of scope which will destroy the queue. } diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/stub/cache_grpc_client.h b/mindspore/ccsrc/minddata/dataset/engine/cache/stub/cache_grpc_client.h index 2ad6cd045b4..fcd992797da 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/stub/cache_grpc_client.h +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/stub/cache_grpc_client.h @@ -34,7 +34,7 @@ class CacheClientGreeter : public Service { void *SharedMemoryBaseAddr() { return nullptr; } Status HandleRequest(std::shared_ptr rq) { RETURN_STATUS_UNEXPECTED("Not supported"); } - Status AttachToSharedMemory(int32_t port, bool *local_bypass) { RETURN_STATUS_UNEXPECTED("Not supported"); } + Status AttachToSharedMemory(bool *local_bypass) { RETURN_STATUS_UNEXPECTED("Not supported"); } protected: private: diff --git a/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_error_pass.cc b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_error_pass.cc index fe2dfc5adcb..99b4f2e45e0 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_error_pass.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_error_pass.cc @@ -36,7 +36,8 @@ Status CacheErrorPass::PreRunOnNode(std::shared_ptr node, bool *modifie // Returns an error if ZipOp exists under a cache Status CacheErrorPass::PreRunOnNode(std::shared_ptr node, bool *modified) { if (is_cached_) { - RETURN_STATUS_UNEXPECTED("ZipOp is currently not supported as a descendant operator under a cache."); + return Status(StatusCode::kNotImplementedYet, __LINE__, __FILE__, + "ZipOp is currently not supported as a descendant operator under a cache."); } return Status::OK(); @@ -48,8 +49,8 @@ Status CacheErrorPass::PreRunOnNode(std::shared_ptr node, bool *modified) auto tfuncs = node->TFuncs(); for (size_t i = 0; i < tfuncs.size(); i++) { if (!tfuncs[i]->Deterministic()) { - RETURN_STATUS_UNEXPECTED( - "MapOp with non-deterministic TensorOps is currently not supported as a descendant of cache."); + return Status(StatusCode::kNotImplementedYet, __LINE__, __FILE__, + "MapOp with non-deterministic TensorOps is currently not supported as a descendant of cache."); } } } @@ -59,7 +60,8 @@ Status CacheErrorPass::PreRunOnNode(std::shared_ptr node, bool *modified) // Returns an error if ConcatOp exists under a cache Status CacheErrorPass::PreRunOnNode(std::shared_ptr node, bool *modified) { if (is_cached_) { - RETURN_STATUS_UNEXPECTED("ConcatOp is currently not supported as a descendant operator under a cache."); + return Status(StatusCode::kNotImplementedYet, __LINE__, __FILE__, + "ConcatOp is currently not supported as a descendant operator under a cache."); } return Status::OK(); @@ -68,7 +70,8 @@ Status CacheErrorPass::PreRunOnNode(std::shared_ptr node, bool *modifi // Returns an error if TakeOp exists under a cache Status CacheErrorPass::PreRunOnNode(std::shared_ptr node, bool *modified) { if (is_cached_) { - RETURN_STATUS_UNEXPECTED("TakeOp/SplitOp is currently not supported as a descendant operator under a cache."); + return Status(StatusCode::kNotImplementedYet, __LINE__, __FILE__, + "TakeOp/SplitOp is currently not supported as a descendant operator under a cache."); } return Status::OK(); @@ -77,7 +80,8 @@ Status CacheErrorPass::PreRunOnNode(std::shared_ptr node, bool *modified // Returns an error if SkipOp exists under a cache Status CacheErrorPass::PreRunOnNode(std::shared_ptr node, bool *modified) { if (is_cached_) { - RETURN_STATUS_UNEXPECTED("SkipOp is currently not supported as a descendant operator under a cache."); + return Status(StatusCode::kNotImplementedYet, __LINE__, __FILE__, + "SkipOp is currently not supported as a descendant operator under a cache."); } return Status::OK(); @@ -87,7 +91,8 @@ Status CacheErrorPass::PreRunOnNode(std::shared_ptr node, bool *modified // Returns an error if FilterOp exists under a cache Status CacheErrorPass::PreRunOnNode(std::shared_ptr node, bool *modified) { if (is_cached_) { - RETURN_STATUS_UNEXPECTED("FilterOp is currently not supported as a descendant operator under a cache."); + return Status(StatusCode::kNotImplementedYet, __LINE__, __FILE__, + "FilterOp is currently not supported as a descendant operator under a cache."); } return Status::OK();