fix error msg issues from test team and move hostname/port to CacheClientGreeter

This commit is contained in:
Lixia Chen 2020-11-30 12:49:40 -05:00 committed by tinazhang
parent 6f31bdbf67
commit 1b19118adc
9 changed files with 27 additions and 28 deletions

View File

@ -352,7 +352,7 @@ Status CacheAdminArgHandler::RunCommand() {
// have to wait for its complete shutdown because the server will shutdown // have to wait for its complete shutdown because the server will shutdown
// the comm layer as soon as the request is received, and we need to wait // the comm layer as soon as the request is received, and we need to wait
// on the message queue instead. // on the message queue instead.
// The server will remove the queue and we will then wake up. But on the safe // The server will send a message back and remove the queue and we will then wake up. But on the safe
// side, we will also set up an alarm and kill this process if we hang on // side, we will also set up an alarm and kill this process if we hang on
// the message queue. // the message queue.
alarm(30); alarm(30);
@ -487,8 +487,7 @@ Status CacheAdminArgHandler::StartServer(CommandId command_id) {
if (WIFEXITED(status)) { if (WIFEXITED(status)) {
auto exit_status = WEXITSTATUS(status); auto exit_status = WEXITSTATUS(status);
if (exit_status) { if (exit_status) {
std::string errMsg = msg + "\nChild exit status " + std::to_string(exit_status); return Status(StatusCode::kUnexpectedError, msg);
return Status(StatusCode::kUnexpectedError, errMsg);
} else { } else {
// Not an error, some info message goes to stdout // Not an error, some info message goes to stdout
std::cout << msg << std::endl; std::cout << msg << std::endl;

View File

@ -61,13 +61,11 @@ CacheClient::CacheClient(session_id_type session_id, uint64_t cache_mem_sz, bool
spill_(spill), spill_(spill),
client_id_(-1), client_id_(-1),
local_bypass_(false), local_bypass_(false),
hostname_(std::move(hostname)),
port_(port),
num_connections_(num_connections), num_connections_(num_connections),
prefetch_size_(prefetch_size), prefetch_size_(prefetch_size),
fetch_all_keys_(true) { fetch_all_keys_(true) {
cinfo_.set_session_id(session_id); cinfo_.set_session_id(session_id);
comm_ = std::make_shared<CacheClientGreeter>(hostname_, port_, num_connections_); comm_ = std::make_shared<CacheClientGreeter>(hostname, port, num_connections_);
} }
CacheClient::~CacheClient() { CacheClient::~CacheClient() {
@ -99,8 +97,7 @@ CacheClient::~CacheClient() {
void CacheClient::Print(std::ostream &out) const { void CacheClient::Print(std::ostream &out) const {
out << " Session id: " << session_id() << "\n Cache crc: " << cinfo_.crc() out << " Session id: " << session_id() << "\n Cache crc: " << cinfo_.crc()
<< "\n Server cache id: " << server_connection_id_ << "\n Cache mem size: " << GetCacheMemSz() << "\n Server cache id: " << server_connection_id_ << "\n Cache mem size: " << GetCacheMemSz()
<< "\n Spilling: " << std::boolalpha << isSpill() << "\n Hostname: " << GetHostname() << "\n Spilling: " << std::boolalpha << isSpill() << "\n Number of rpc workers: " << GetNumConnections()
<< "\n Port: " << GetPort() << "\n Number of rpc workers: " << GetNumConnections()
<< "\n Prefetch size: " << GetPrefetchSize() << "\n Local client support: " << std::boolalpha << "\n Prefetch size: " << GetPrefetchSize() << "\n Local client support: " << std::boolalpha
<< SupportLocalClient(); << SupportLocalClient();
} }
@ -251,7 +248,7 @@ Status CacheClient::CreateCache(uint32_t tree_crc, bool generate_id) {
} }
if (success) { if (success) {
// Attach to shared memory for local client // Attach to shared memory for local client
RETURN_IF_NOT_OK(comm_->AttachToSharedMemory(port_, &local_bypass_)); RETURN_IF_NOT_OK(comm_->AttachToSharedMemory(&local_bypass_));
if (local_bypass_) { if (local_bypass_) {
async_buffer_stream_ = std::make_shared<AsyncBufferStream>(); async_buffer_stream_ = std::make_shared<AsyncBufferStream>();
RETURN_IF_NOT_OK(async_buffer_stream_->Init(this)); RETURN_IF_NOT_OK(async_buffer_stream_->Init(this));

View File

@ -234,8 +234,6 @@ class CacheClient {
session_id_type session_id() const { return cinfo_.session_id(); } session_id_type session_id() const { return cinfo_.session_id(); }
uint64_t GetCacheMemSz() const { return cache_mem_sz_; } uint64_t GetCacheMemSz() const { return cache_mem_sz_; }
bool isSpill() const { return spill_; } bool isSpill() const { return spill_; }
const std::string &GetHostname() const { return hostname_; }
int32_t GetPort() const { return port_; }
int32_t GetNumConnections() const { return num_connections_; } int32_t GetNumConnections() const { return num_connections_; }
int32_t GetPrefetchSize() const { return prefetch_size_; } int32_t GetPrefetchSize() const { return prefetch_size_; }
int32_t GetClientId() const { return client_id_; } int32_t GetClientId() const { return client_id_; }
@ -288,8 +286,6 @@ class CacheClient {
std::vector<int32_t> cpu_list_; std::vector<int32_t> cpu_list_;
// Comm layer // Comm layer
bool local_bypass_; bool local_bypass_;
std::string hostname_;
int32_t port_;
int32_t num_connections_; int32_t num_connections_;
int32_t prefetch_size_; int32_t prefetch_size_;
mutable std::shared_ptr<CacheClientGreeter> comm_; mutable std::shared_ptr<CacheClientGreeter> comm_;

View File

@ -20,12 +20,12 @@ namespace dataset {
CacheClientGreeter::~CacheClientGreeter() { (void)ServiceStop(); } CacheClientGreeter::~CacheClientGreeter() { (void)ServiceStop(); }
CacheClientGreeter::CacheClientGreeter(const std::string &hostname, int32_t port, int32_t num_connections) CacheClientGreeter::CacheClientGreeter(const std::string &hostname, int32_t port, int32_t num_connections)
: num_connections_(num_connections), request_cnt_(0), port_(port) { : num_connections_(num_connections), request_cnt_(0), hostname_(std::move(hostname)), port_(port) {
grpc::ChannelArguments args; grpc::ChannelArguments args;
// We need to bump up the message size to unlimited. The default receiving // We need to bump up the message size to unlimited. The default receiving
// message limit is 4MB which is not big enough. // message limit is 4MB which is not big enough.
args.SetMaxReceiveMessageSize(-1); args.SetMaxReceiveMessageSize(-1);
MS_LOG(INFO) << "Hostname: " << hostname << "."; MS_LOG(INFO) << "Hostname: " << hostname_ << ", port: " << std::to_string(port_);
#if CACHE_LOCAL_CLIENT #if CACHE_LOCAL_CLIENT
// Try connect locally to the unix_socket first as the first preference // Try connect locally to the unix_socket first as the first preference
// Need to resolve hostname to ip address rather than to do a string compare // Need to resolve hostname to ip address rather than to do a string compare
@ -42,11 +42,11 @@ CacheClientGreeter::CacheClientGreeter(const std::string &hostname, int32_t port
stub_ = CacheServerGreeter::NewStub(channel_); stub_ = CacheServerGreeter::NewStub(channel_);
} }
Status CacheClientGreeter::AttachToSharedMemory(int32_t port, bool *local_bypass) { Status CacheClientGreeter::AttachToSharedMemory(bool *local_bypass) {
*local_bypass = false; *local_bypass = false;
#if CACHE_LOCAL_CLIENT #if CACHE_LOCAL_CLIENT
SharedMemory::shm_key_t shm_key; SharedMemory::shm_key_t shm_key;
RETURN_IF_NOT_OK(PortToFtok(port, &shm_key)); RETURN_IF_NOT_OK(PortToFtok(port_, &shm_key));
// Attach to the shared memory // Attach to the shared memory
mem_.SetPublicKey(shm_key); mem_.SetPublicKey(shm_key);
RETURN_IF_NOT_OK(mem_.Attach()); RETURN_IF_NOT_OK(mem_.Attach());
@ -120,7 +120,7 @@ Status CacheClientGreeter::WorkerEntry() {
std::string err_msg; std::string err_msg;
if (error_code == grpc::StatusCode::UNAVAILABLE) { if (error_code == grpc::StatusCode::UNAVAILABLE) {
err_msg = "Cache server with port " + std::to_string(port_) + err_msg = "Cache server with port " + std::to_string(port_) +
" is unreachable. Make sure the server is running. GRPC Code" + std::to_string(error_code); " is unreachable. Make sure the server is running. GRPC Code " + std::to_string(error_code);
} else { } else {
err_msg = rq->rc_.error_message() + ". GRPC Code " + std::to_string(error_code); err_msg = rq->rc_.error_message() + ". GRPC Code " + std::to_string(error_code);
} }

View File

@ -80,7 +80,7 @@ class CacheClientGreeter : public Service {
/// \brief Attach to shared memory for local client /// \brief Attach to shared memory for local client
/// \note Called after we have established a connection. /// \note Called after we have established a connection.
/// \return Status object. /// \return Status object.
Status AttachToSharedMemory(int32_t port, bool *local_bypass); Status AttachToSharedMemory(bool *local_bypass);
/// \brief This returns where we attach to the shared memory. /// \brief This returns where we attach to the shared memory.
/// \return Base address of the shared memory. /// \return Base address of the shared memory.
@ -97,6 +97,7 @@ class CacheClientGreeter : public Service {
std::map<int64_t, std::unique_ptr<CacheClientRequestTag>> req_; std::map<int64_t, std::unique_ptr<CacheClientRequestTag>> req_;
SharedMemory mem_; SharedMemory mem_;
int32_t port_; int32_t port_;
std::string hostname_;
}; };
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

View File

@ -255,7 +255,7 @@ Status CreateCacheRequest::PostReply() {
if (err == -1) { if (err == -1) {
RETURN_STATUS_UNEXPECTED("Unable to set affinity. Errno = " + std::to_string(errno)); RETURN_STATUS_UNEXPECTED("Unable to set affinity. Errno = " + std::to_string(errno));
} }
MS_LOG(WARNING) << "Changing cpu affinity to the following list of cpu id: " + c_list; MS_LOG(INFO) << "Changing cpu affinity to the following list of cpu id: " + c_list;
} }
#endif #endif

View File

@ -167,6 +167,7 @@ Status CacheServer::DoServiceStop() {
// Finally wake up cache_admin if it is waiting // Finally wake up cache_admin if it is waiting
for (int32_t qID : shutdown_qIDs_) { for (int32_t qID : shutdown_qIDs_) {
SharedMessage msg(qID); SharedMessage msg(qID);
msg.SendStatus(Status::OK());
msg.RemoveResourcesOnExit(); msg.RemoveResourcesOnExit();
// Let msg goes out of scope which will destroy the queue. // Let msg goes out of scope which will destroy the queue.
} }

View File

@ -34,7 +34,7 @@ class CacheClientGreeter : public Service {
void *SharedMemoryBaseAddr() { return nullptr; } void *SharedMemoryBaseAddr() { return nullptr; }
Status HandleRequest(std::shared_ptr<BaseRequest> rq) { RETURN_STATUS_UNEXPECTED("Not supported"); } Status HandleRequest(std::shared_ptr<BaseRequest> rq) { RETURN_STATUS_UNEXPECTED("Not supported"); }
Status AttachToSharedMemory(int32_t port, bool *local_bypass) { RETURN_STATUS_UNEXPECTED("Not supported"); } Status AttachToSharedMemory(bool *local_bypass) { RETURN_STATUS_UNEXPECTED("Not supported"); }
protected: protected:
private: private:

View File

@ -36,7 +36,8 @@ Status CacheErrorPass::PreRunOnNode(std::shared_ptr<CacheOp> node, bool *modifie
// Returns an error if ZipOp exists under a cache // Returns an error if ZipOp exists under a cache
Status CacheErrorPass::PreRunOnNode(std::shared_ptr<ZipOp> node, bool *modified) { Status CacheErrorPass::PreRunOnNode(std::shared_ptr<ZipOp> node, bool *modified) {
if (is_cached_) { if (is_cached_) {
RETURN_STATUS_UNEXPECTED("ZipOp is currently not supported as a descendant operator under a cache."); return Status(StatusCode::kNotImplementedYet, __LINE__, __FILE__,
"ZipOp is currently not supported as a descendant operator under a cache.");
} }
return Status::OK(); return Status::OK();
@ -48,8 +49,8 @@ Status CacheErrorPass::PreRunOnNode(std::shared_ptr<MapOp> node, bool *modified)
auto tfuncs = node->TFuncs(); auto tfuncs = node->TFuncs();
for (size_t i = 0; i < tfuncs.size(); i++) { for (size_t i = 0; i < tfuncs.size(); i++) {
if (!tfuncs[i]->Deterministic()) { if (!tfuncs[i]->Deterministic()) {
RETURN_STATUS_UNEXPECTED( return Status(StatusCode::kNotImplementedYet, __LINE__, __FILE__,
"MapOp with non-deterministic TensorOps is currently not supported as a descendant of cache."); "MapOp with non-deterministic TensorOps is currently not supported as a descendant of cache.");
} }
} }
} }
@ -59,7 +60,8 @@ Status CacheErrorPass::PreRunOnNode(std::shared_ptr<MapOp> node, bool *modified)
// Returns an error if ConcatOp exists under a cache // Returns an error if ConcatOp exists under a cache
Status CacheErrorPass::PreRunOnNode(std::shared_ptr<ConcatOp> node, bool *modified) { Status CacheErrorPass::PreRunOnNode(std::shared_ptr<ConcatOp> node, bool *modified) {
if (is_cached_) { if (is_cached_) {
RETURN_STATUS_UNEXPECTED("ConcatOp is currently not supported as a descendant operator under a cache."); return Status(StatusCode::kNotImplementedYet, __LINE__, __FILE__,
"ConcatOp is currently not supported as a descendant operator under a cache.");
} }
return Status::OK(); return Status::OK();
@ -68,7 +70,8 @@ Status CacheErrorPass::PreRunOnNode(std::shared_ptr<ConcatOp> node, bool *modifi
// Returns an error if TakeOp exists under a cache // Returns an error if TakeOp exists under a cache
Status CacheErrorPass::PreRunOnNode(std::shared_ptr<TakeOp> node, bool *modified) { Status CacheErrorPass::PreRunOnNode(std::shared_ptr<TakeOp> node, bool *modified) {
if (is_cached_) { if (is_cached_) {
RETURN_STATUS_UNEXPECTED("TakeOp/SplitOp is currently not supported as a descendant operator under a cache."); return Status(StatusCode::kNotImplementedYet, __LINE__, __FILE__,
"TakeOp/SplitOp is currently not supported as a descendant operator under a cache.");
} }
return Status::OK(); return Status::OK();
@ -77,7 +80,8 @@ Status CacheErrorPass::PreRunOnNode(std::shared_ptr<TakeOp> node, bool *modified
// Returns an error if SkipOp exists under a cache // Returns an error if SkipOp exists under a cache
Status CacheErrorPass::PreRunOnNode(std::shared_ptr<SkipOp> node, bool *modified) { Status CacheErrorPass::PreRunOnNode(std::shared_ptr<SkipOp> node, bool *modified) {
if (is_cached_) { if (is_cached_) {
RETURN_STATUS_UNEXPECTED("SkipOp is currently not supported as a descendant operator under a cache."); return Status(StatusCode::kNotImplementedYet, __LINE__, __FILE__,
"SkipOp is currently not supported as a descendant operator under a cache.");
} }
return Status::OK(); return Status::OK();
@ -87,7 +91,8 @@ Status CacheErrorPass::PreRunOnNode(std::shared_ptr<SkipOp> node, bool *modified
// Returns an error if FilterOp exists under a cache // Returns an error if FilterOp exists under a cache
Status CacheErrorPass::PreRunOnNode(std::shared_ptr<FilterOp> node, bool *modified) { Status CacheErrorPass::PreRunOnNode(std::shared_ptr<FilterOp> node, bool *modified) {
if (is_cached_) { if (is_cached_) {
RETURN_STATUS_UNEXPECTED("FilterOp is currently not supported as a descendant operator under a cache."); return Status(StatusCode::kNotImplementedYet, __LINE__, __FILE__,
"FilterOp is currently not supported as a descendant operator under a cache.");
} }
return Status::OK(); return Status::OK();