Added fdbrpc/SimExternalClient unit test

This commit is contained in:
sfc-gh-tclinkenbeard 2020-12-23 19:54:01 -04:00
parent 201857541d
commit 555c3d95fc
2 changed files with 68 additions and 2 deletions

View File

@ -23,7 +23,10 @@
#include <chrono>
#include <thread>
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/SimExternalConnection.h"
#include "flow/Net2Packet.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.
using namespace boost::asio;
@ -47,6 +50,9 @@ Future<Void> SimExternalConnection::onWritable() {
}
Future<Void> SimExternalConnection::onReadable() {
if (!readBuffer.empty()) {
return Void();
}
return onReadableTrigger.onTrigger();
}
@ -126,9 +132,67 @@ Future<Reference<IConnection>> SimExternalConnection::connect(NetworkAddress toA
} else {
address = boost::asio::ip::address_v4(ip.toV4());
}
socket.connect(ip::tcp::endpoint(address, toAddr.port));
return Reference<IConnection>(new SimExternalConnection(std::move(socket)));
boost::system::error_code err;
socket.connect(ip::tcp::endpoint(address, toAddr.port), err);
if (err) {
return Reference<IConnection>();
} else {
return Reference<IConnection>(new SimExternalConnection(std::move(socket)));
}
}
SimExternalConnection::SimExternalConnection(ip::tcp::socket&& socket)
: socket(std::move(socket)), dbgid(deterministicRandom()->randomUniqueID()) {}
static constexpr auto testEchoServerPort = 8000;
static void testEchoServer() {
io_service ios;
ip::tcp::acceptor acceptor(ios, ip::tcp::endpoint(ip::tcp::v4(), testEchoServerPort));
ip::tcp::socket socket(ios);
acceptor.accept(socket);
loop {
char data[10000];
boost::system::error_code err;
auto length = socket.read_some(buffer(data), err);
if (err == boost::asio::error::eof) {
return;
}
ASSERT(!err);
write(socket, buffer(data, length));
}
}
TEST_CASE("fdbrpc/SimExternalClient") {
state const size_t maxDataLength = 10000;
state std::thread serverThread([] { return testEchoServer(); });
state UnsentPacketQueue packetQueue;
state Reference<IConnection> externalConn;
state const IPAddress localhost = IPAddress::parse("127.0.0.1").get();
loop {
// Wait until server is ready
Reference<IConnection> _externalConn =
wait(INetworkConnections::net()->connectExternal(NetworkAddress(localhost, testEchoServerPort)));
if (_externalConn.isValid()) {
externalConn = std::move(_externalConn);
break;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
state Key data = deterministicRandom()->randomAlphaNumeric(deterministicRandom()->randomInt(0, maxDataLength + 1));
PacketWriter packetWriter(packetQueue.getWriteBuffer(data.size()), nullptr, Unversioned());
packetWriter.serializeBytes(data);
wait(externalConn->onWritable());
externalConn->write(packetQueue.getUnsent());
wait(externalConn->onReadable());
std::vector<uint8_t> vec(data.size());
externalConn->read(&vec[0], &vec[0] + vec.size());
externalConn->close();
StringRef echo(&vec[0], vec.size());
ASSERT(echo.toString() == data.toString());
serverThread.join();
return Void();
}
void forceLinkSimExternalConnectionTests() {}

View File

@ -28,6 +28,7 @@ void forceLinkFlowTests();
void forceLinkVersionedMapTests();
void forceLinkMemcpyTests();
void forceLinkMemcpyPerfTests();
void forceLinkSimExternalConnectionTests();
struct UnitTestWorkload : TestWorkload {
bool enabled;
@ -49,6 +50,7 @@ struct UnitTestWorkload : TestWorkload {
forceLinkVersionedMapTests();
forceLinkMemcpyTests();
forceLinkMemcpyPerfTests();
forceLinkSimExternalConnectionTests();
}
std::string description() const override { return "UnitTests"; }