!31578 Fix exception for sending big message using TCP client.

Merge pull request !31578 from chengang/fix_rpc_big_msg
This commit is contained in:
i-robot 2022-03-22 01:27:29 +00:00 committed by Gitee
commit 3b24d825bf
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
2 changed files with 48 additions and 5 deletions

View File

@ -84,7 +84,7 @@ int TCPSocketOperation::ReceiveMessage(Connection *connection, struct msghdr *re
reinterpret_cast<char *>(recvMsg->msg_iov[i].iov_base) + static_cast<unsigned int>(retval) - tmpLen;
recvMsg->msg_iov = &recvMsg->msg_iov[i];
recvMsg->msg_iovlen -= (i + 1);
recvMsg->msg_iovlen -= i;
break;
}
}

View File

@ -75,16 +75,17 @@ class TCPTest : public UT::Common {
void SetUp() {}
void TearDown() {}
std::unique_ptr<MessageBase> CreateMessage(const std::string &serverUrl, const std::string &client_url);
std::unique_ptr<MessageBase> CreateMessage(const std::string &serverUrl, const std::string &client_url,
size_t msg_size = 100);
bool CheckRecvNum(int expectedRecvNum, int _timeout);
bool CheckExitNum(int expectedExitNum, int _timeout);
};
std::unique_ptr<MessageBase> TCPTest::CreateMessage(const std::string &serverUrl, const std::string &clientUrl) {
std::unique_ptr<MessageBase> TCPTest::CreateMessage(const std::string &serverUrl, const std::string &clientUrl,
size_t msg_size) {
std::unique_ptr<MessageBase> message = std::make_unique<MessageBase>();
size_t len = 100;
std::string data(len, 'A');
std::string data(msg_size, 'A');
message->name = "testname";
message->from = AID("client", clientUrl);
message->to = AID("server", serverUrl);
@ -270,6 +271,48 @@ TEST_F(TCPTest, SendSyncMessage) {
client->Finalize();
server->Finalize();
}
/// Feature: test sending large messages.
/// Description: start a socket server and send several large messages to it.
/// Expectation: the server received these large messages sented from client.
TEST_F(TCPTest, SendLargeMessages) {
Init();
// Start the tcp server.
auto server_url = "127.0.0.1:8081";
std::unique_ptr<TCPServer> server = std::make_unique<TCPServer>();
bool ret = server->Initialize(server_url);
ASSERT_TRUE(ret);
server->SetMessageHandler([](const std::shared_ptr<MessageBase> &message) -> void { IncrDataMsgNum(1); });
// Start the tcp client.
auto client_url = "127.0.0.1:1234";
std::unique_ptr<TCPClient> client = std::make_unique<TCPClient>();
ret = client->Initialize();
ASSERT_TRUE(ret);
// Send the message.
client->Connect(server_url);
size_t msg_cnt = 100;
size_t large_msg_size = 102400;
for (int i = 0; i < msg_cnt; ++i) {
auto message = CreateMessage(server_url, client_url, large_msg_size);
client->SendAsync(std::move(message));
}
// Wait timeout: 15s
WaitForDataMsg(msg_cnt, 15);
// Check result
EXPECT_EQ(msg_cnt, GetDataMsgNum());
// Destroy
client->Disconnect(server_url);
client->Finalize();
server->Finalize();
}
} // namespace rpc
} // namespace distributed
} // namespace mindspore