Set cb in tcp comm

This commit is contained in:
ZPaC 2022-09-24 09:38:01 +08:00
parent 685276d722
commit 9ef032bc45
3 changed files with 7 additions and 23 deletions

View File

@ -59,9 +59,8 @@ void TCPClient::Finalize() {
bool TCPClient::Connect(const std::string &dst_url, size_t retry_count, const MemFreeCallback &free_cb) {
unsigned int interval = 5;
for (size_t i = 0; i < retry_count; ++i) {
if (tcp_comm_->Connect(dst_url)) {
if (tcp_comm_->Connect(dst_url, free_cb)) {
MS_LOG(INFO) << "Connected to the tcp server " << dst_url << " successfully.";
tcp_comm_->SetMessageFreeCallback(dst_url, free_cb);
return true;
} else {
MS_LOG(WARNING) << "Failed to connect to the tcp server : " << dst_url << ", retry to reconnect(" << (i + 1)

View File

@ -225,17 +225,6 @@ bool TCPComm::StartServerSocket(const MemAllocateCallback &allocate_cb) {
int TCPComm::GetServerFd() const { return server_fd_; }
void TCPComm::SetMessageFreeCallback(const std::string &dst_url, const MemFreeCallback &free_cb) {
Connection *conn = conn_pool_->FindConnection(dst_url);
if (conn == nullptr) {
MS_LOG(EXCEPTION) << "Can not find the connection to url: " << dst_url;
}
if (!free_cb) {
MS_LOG(EXCEPTION) << "The message callback is empty.";
}
return conn->SetMessageFreeCallback(free_cb);
}
void TCPComm::ReadCallBack(void *connection) {
const int max_recv_count = 3;
Connection *conn = reinterpret_cast<Connection *>(connection);
@ -407,9 +396,12 @@ bool TCPComm::Flush(const std::string &dst_url) {
}
}
bool TCPComm::Connect(const std::string &dst_url) {
bool TCPComm::Connect(const std::string &dst_url, const MemFreeCallback &free_cb) {
MS_EXCEPTION_IF_NULL(conn_mutex_);
MS_EXCEPTION_IF_NULL(conn_pool_);
if (!free_cb) {
MS_LOG(EXCEPTION) << "The message callback is empty.";
}
std::lock_guard<std::mutex> lock(*conn_mutex_);
@ -472,6 +464,7 @@ bool TCPComm::Connect(const std::string &dst_url) {
return false;
}
conn_pool_->AddConnection(conn);
conn->SetMessageFreeCallback(free_cb);
}
conn_pool_->AddConnInfo(conn->socket_fd, dst_url, nullptr);
MS_LOG(INFO) << "Connected to destination: " << dst_url;

View File

@ -58,7 +58,7 @@ class TCPComm {
bool StartServerSocket(const MemAllocateCallback &allocate_cb);
// Connection operation for a specified destination.
bool Connect(const std::string &dst_url);
bool Connect(const std::string &dst_url, const MemFreeCallback &free_cb);
bool IsConnected(const std::string &dst_url);
bool Disconnect(const std::string &dst_url);
@ -75,14 +75,6 @@ class TCPComm {
// Get the file descriptor of server socket.
int GetServerFd() const;
/**
* @description: Set callback to free message for the connection specified by url.
* @param {string} &dst_url: The connection url.
* @param {MemFreeCallback} &free_cb: The callback which frees the real memory after message is sent to peer.
* @return {void}
*/
void SetMessageFreeCallback(const std::string &dst_url, const MemFreeCallback &free_cb);
/**
* @description: Returns the allocating callback.
* @return {const MemAllocateCallback &}