forked from mindspore-Ecosystem/mindspore
Support retry to connect to tcp server
This commit is contained in:
parent
3b8e4e70ce
commit
e67be5f328
|
@ -47,27 +47,20 @@ void TCPClient::Finalize() {
|
|||
}
|
||||
}
|
||||
|
||||
bool TCPClient::Connect(const std::string &dst_url, size_t timeout_in_sec) {
|
||||
bool rt = false;
|
||||
tcp_comm_->Connect(dst_url);
|
||||
|
||||
size_t timeout_in_ms = timeout_in_sec * 1000;
|
||||
size_t sleep_in_ms = 100;
|
||||
useconds_t sleep_in_us = 100000;
|
||||
|
||||
while (true) {
|
||||
if (tcp_comm_->IsConnected(dst_url)) {
|
||||
rt = true;
|
||||
break;
|
||||
}
|
||||
if (timeout_in_ms > sleep_in_ms) {
|
||||
timeout_in_ms -= sleep_in_ms;
|
||||
bool TCPClient::Connect(const std::string &dst_url, size_t retry_count) {
|
||||
size_t interval = 5;
|
||||
for (size_t i = 0; i < retry_count; ++i) {
|
||||
if (tcp_comm_->Connect(dst_url)) {
|
||||
MS_LOG(INFO) << "Connected to the tcp server " << dst_url << " successfully.";
|
||||
return true;
|
||||
} else {
|
||||
break;
|
||||
MS_LOG(WARNING) << "Failed to connect to the tcp server : " << dst_url << ", retry to reconnect(" << (i + 1)
|
||||
<< "/" << retry_count << ")...";
|
||||
tcp_comm_->Disconnect(dst_url);
|
||||
sleep(interval);
|
||||
}
|
||||
(void)usleep(sleep_in_us);
|
||||
}
|
||||
return rt;
|
||||
return false;
|
||||
}
|
||||
|
||||
bool TCPClient::Disconnect(const std::string &dst_url, size_t timeout_in_sec) {
|
||||
|
|
|
@ -38,7 +38,7 @@ class TCPClient {
|
|||
void Finalize();
|
||||
|
||||
// Connect to the specified server.
|
||||
bool Connect(const std::string &dst_url, size_t timeout_in_sec = 5);
|
||||
bool Connect(const std::string &dst_url, size_t retry_count = 60);
|
||||
|
||||
// Disconnect from the specified server.
|
||||
bool Disconnect(const std::string &dst_url, size_t timeout_in_sec = 5);
|
||||
|
|
|
@ -345,61 +345,69 @@ ssize_t TCPComm::Send(MessageBase *msg, bool sync) {
|
|||
}
|
||||
}
|
||||
|
||||
void TCPComm::Connect(const std::string &dst_url) {
|
||||
(void)recv_event_loop_->AddTask([dst_url, this] {
|
||||
std::lock_guard<std::mutex> lock(*conn_mutex_);
|
||||
bool TCPComm::Connect(const std::string &dst_url) {
|
||||
std::lock_guard<std::mutex> lock(*conn_mutex_);
|
||||
|
||||
// Search connection by the target address
|
||||
Connection *conn = conn_pool_->FindConnection(dst_url);
|
||||
// Search connection by the target address
|
||||
Connection *conn = conn_pool_->FindConnection(dst_url);
|
||||
|
||||
if (conn == nullptr) {
|
||||
MS_LOG(INFO) << "Can not found link destination: " << dst_url;
|
||||
conn = new (std::nothrow) Connection();
|
||||
if (conn == nullptr) {
|
||||
MS_LOG(INFO) << "Can not found link destination: " << dst_url;
|
||||
conn = new (std::nothrow) Connection();
|
||||
if (conn == nullptr) {
|
||||
MS_LOG(ERROR) << "Failed to create new connection and link fail destination: " << dst_url;
|
||||
return false;
|
||||
}
|
||||
conn->recv_event_loop = this->recv_event_loop_;
|
||||
conn->send_event_loop = this->send_event_loop_;
|
||||
conn->conn_mutex = conn_mutex_;
|
||||
conn->message_handler = message_handler_;
|
||||
conn->InitSocketOperation();
|
||||
|
||||
// Create the client socket.
|
||||
SocketAddress addr;
|
||||
if (!SocketOperation::GetSockAddr(dst_url, &addr)) {
|
||||
MS_LOG(ERROR) << "Failed to get socket address to dest url " << dst_url;
|
||||
return false;
|
||||
}
|
||||
int sock_fd = SocketOperation::CreateSocket(addr.sa.sa_family);
|
||||
if (sock_fd < 0) {
|
||||
MS_LOG(ERROR) << "Failed to create client tcp socket to dest url " << dst_url;
|
||||
return false;
|
||||
}
|
||||
|
||||
conn->socket_fd = sock_fd;
|
||||
conn->event_callback = TCPComm::EventCallBack;
|
||||
conn->write_callback = TCPComm::WriteCallBack;
|
||||
conn->read_callback = TCPComm::ReadCallBack;
|
||||
|
||||
int ret = TCPComm::DoConnect(conn, (struct sockaddr *)&addr, sizeof(addr));
|
||||
if (ret < 0) {
|
||||
MS_LOG(ERROR) << "Failed to do connect and link fail destination: " << dst_url;
|
||||
if (conn->socket_operation != nullptr) {
|
||||
delete conn->socket_operation;
|
||||
conn->socket_operation = nullptr;
|
||||
}
|
||||
delete conn;
|
||||
return false;
|
||||
}
|
||||
conn->source = SocketOperation::GetLocalIP() + ":" + std::to_string(SocketOperation::GetPort(sock_fd));
|
||||
conn->destination = dst_url;
|
||||
conn_pool_->AddConnection(conn);
|
||||
MS_LOG(ERROR) << "Failed to create new connection and link fail destination: " << dst_url;
|
||||
return false;
|
||||
}
|
||||
conn_pool_->AddConnInfo(conn->socket_fd, dst_url, nullptr);
|
||||
MS_LOG(INFO) << "Connected to destination: " << dst_url;
|
||||
return true;
|
||||
});
|
||||
conn->recv_event_loop = this->recv_event_loop_;
|
||||
conn->send_event_loop = this->send_event_loop_;
|
||||
conn->conn_mutex = conn_mutex_;
|
||||
conn->message_handler = message_handler_;
|
||||
conn->InitSocketOperation();
|
||||
|
||||
// Create the client socket.
|
||||
SocketAddress addr;
|
||||
if (!SocketOperation::GetSockAddr(dst_url, &addr)) {
|
||||
MS_LOG(ERROR) << "Failed to get socket address to dest url " << dst_url;
|
||||
return false;
|
||||
}
|
||||
int sock_fd = SocketOperation::CreateSocket(addr.sa.sa_family);
|
||||
if (sock_fd < 0) {
|
||||
MS_LOG(ERROR) << "Failed to create client tcp socket to dest url " << dst_url;
|
||||
return false;
|
||||
}
|
||||
|
||||
conn->socket_fd = sock_fd;
|
||||
conn->event_callback = TCPComm::EventCallBack;
|
||||
conn->write_callback = TCPComm::WriteCallBack;
|
||||
conn->read_callback = TCPComm::ReadCallBack;
|
||||
|
||||
int ret = TCPComm::DoConnect(conn, (struct sockaddr *)&addr, sizeof(addr));
|
||||
if (ret < 0) {
|
||||
MS_LOG(ERROR) << "Failed to do connect and link fail destination: " << dst_url;
|
||||
if (conn->socket_operation != nullptr) {
|
||||
delete conn->socket_operation;
|
||||
conn->socket_operation = nullptr;
|
||||
}
|
||||
delete conn;
|
||||
return false;
|
||||
}
|
||||
conn->source = SocketOperation::GetLocalIP() + ":" + std::to_string(SocketOperation::GetPort(sock_fd));
|
||||
conn->destination = dst_url;
|
||||
|
||||
// Check the state of this new created connection.
|
||||
uint32_t interval = 3;
|
||||
while (conn->state < ConnectionState::kConnected) {
|
||||
MS_LOG(WARNING) << "Waiting for the state of the connection to " << dst_url << " to be connected...";
|
||||
sleep(interval);
|
||||
}
|
||||
if (conn->state != ConnectionState::kConnected) {
|
||||
return false;
|
||||
}
|
||||
conn_pool_->AddConnection(conn);
|
||||
}
|
||||
conn_pool_->AddConnInfo(conn->socket_fd, dst_url, nullptr);
|
||||
MS_LOG(INFO) << "Connected to destination: " << dst_url;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool TCPComm::IsConnected(const std::string &dst_url) {
|
||||
|
@ -422,11 +430,8 @@ bool TCPComm::Disconnect(const std::string &dst_url) {
|
|||
<< ", because there are still pending tasks to be executed, please try later.";
|
||||
return false;
|
||||
}
|
||||
(void)recv_event_loop_->AddTask([dst_url, this] {
|
||||
std::lock_guard<std::mutex> lock(*conn_mutex_);
|
||||
conn_pool_->DeleteConnection(dst_url);
|
||||
return true;
|
||||
});
|
||||
std::lock_guard<std::mutex> lock(*conn_mutex_);
|
||||
conn_pool_->DeleteConnection(dst_url);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ class TCPComm {
|
|||
bool StartServerSocket();
|
||||
|
||||
// Connection operation for a specified destination.
|
||||
void Connect(const std::string &dst_url);
|
||||
bool Connect(const std::string &dst_url);
|
||||
bool IsConnected(const std::string &dst_url);
|
||||
bool Disconnect(const std::string &dst_url);
|
||||
|
||||
|
|
Loading…
Reference in New Issue