[lldb] Add a gdb_remote_client test for connecting to pty

Add a minimal mock server utilizing a pty, and add a client test
connecting to that server.

Differential Revision: https://reviews.llvm.org/D110878
This commit is contained in:
Michał Górny 2021-09-30 22:30:30 +02:00
parent 08b63db8bb
commit 8fa2394bad
4 changed files with 169 additions and 38 deletions

View File

@ -50,7 +50,7 @@ class TestPlatformClient(GDBRemoteTestBase):
try:
self.runCmd("platform select remote-linux")
self.runCmd("platform connect connect://" + self.server.get_connect_address())
self.runCmd("platform connect " + self.server.get_connect_url())
self.assertTrue(self.dbg.GetSelectedPlatform().IsConnected())
self.expect("platform process list -x",
substrs=["2 matching processes were found", "test_process", "another_test_process"])
@ -84,8 +84,8 @@ class TestPlatformClient(GDBRemoteTestBase):
self.runCmd("settings set plugin.process.gdb-remote.packet-timeout 30")
plat = lldb.SBPlatform("remote-linux")
try:
self.assertSuccess(plat.ConnectRemote(lldb.SBPlatformConnectOptions("connect://"
+ self.server.get_connect_address())))
self.assertSuccess(plat.ConnectRemote(lldb.SBPlatformConnectOptions(
self.server.get_connect_url())))
self.assertEqual(plat.GetWorkingDirectory(), "/foo/bar")
finally:
plat.DisconnectRemote()
@ -98,8 +98,8 @@ class TestPlatformClient(GDBRemoteTestBase):
self.runCmd("settings set plugin.process.gdb-remote.packet-timeout 3")
plat = lldb.SBPlatform("remote-linux")
try:
self.assertSuccess(plat.ConnectRemote(lldb.SBPlatformConnectOptions("connect://"
+ self.server.get_connect_address())))
self.assertSuccess(plat.ConnectRemote(lldb.SBPlatformConnectOptions(
self.server.get_connect_url())))
self.assertIsNone(plat.GetWorkingDirectory())
finally:
plat.DisconnectRemote()

View File

@ -39,8 +39,7 @@ class TestProcessConnect(GDBRemoteTestBase):
self.dbg.SetAsync(False)
self.expect("platform select remote-gdb-server",
substrs=['Platform: remote-gdb-server', 'Connected: no'])
self.expect("process connect connect://" +
self.server.get_connect_address(),
self.expect("process connect " + self.server.get_connect_url(),
substrs=['Process', 'stopped'])
finally:
self.dbg.GetSelectedPlatform().DisconnectRemote()
@ -52,8 +51,7 @@ class TestProcessConnect(GDBRemoteTestBase):
self.dbg.SetAsync(True)
self.expect("platform select remote-gdb-server",
substrs=['Platform: remote-gdb-server', 'Connected: no'])
self.expect("process connect connect://" +
self.server.get_connect_address(),
self.expect("process connect " + self.server.get_connect_url(),
matching=False,
substrs=['Process', 'stopped'])
lldbutil.expect_state_changes(self, self.dbg.GetListener(),

View File

@ -0,0 +1,35 @@
import lldb
from lldbsuite.test.lldbtest import *
from lldbsuite.test.decorators import *
from gdbclientutils import *
@skipIfWindows
class TestPty(GDBRemoteTestBase):
mydir = TestBase.compute_mydir(__file__)
server_socket_class = PtyServerSocket
def test_process_connect_sync(self):
"""Test the process connect command in synchronous mode"""
try:
self.dbg.SetAsync(False)
self.expect("platform select remote-gdb-server",
substrs=['Platform: remote-gdb-server', 'Connected: no'])
self.expect("process connect " + self.server.get_connect_url(),
substrs=['Process', 'stopped'])
finally:
self.dbg.GetSelectedPlatform().DisconnectRemote()
def test_process_connect_async(self):
"""Test the process connect command in asynchronous mode"""
try:
self.dbg.SetAsync(True)
self.expect("platform select remote-gdb-server",
substrs=['Platform: remote-gdb-server', 'Connected: no'])
self.expect("process connect " + self.server.get_connect_url(),
matching=False,
substrs=['Process', 'stopped'])
lldbutil.expect_state_changes(self, self.dbg.GetListener(),
self.process(), [lldb.eStateStopped])
finally:
self.dbg.GetSelectedPlatform().DisconnectRemote()

View File

@ -1,8 +1,12 @@
import ctypes
import errno
import io
import os
import os.path
import pty
import threading
import socket
import tty
import lldb
import binascii
import traceback
@ -332,6 +336,110 @@ class MockGDBServerResponder:
pass
class ServerSocket:
"""
A wrapper class for TCP or pty-based server.
"""
def get_connect_address(self):
"""Get address for the client to connect to."""
def get_connect_url(self):
"""Get URL suitable for process connect command."""
def close_server(self):
"""Close all resources used by the server."""
def accept(self):
"""Accept a single client connection to the server."""
def close_connection(self):
"""Close all resources used by the accepted connection."""
def recv(self):
"""Receive a data packet from the connected client."""
def sendall(self, data):
"""Send the data to the connected client."""
class TCPServerSocket(ServerSocket):
def __init__(self):
family, type, proto, _, addr = socket.getaddrinfo(
"localhost", 0, proto=socket.IPPROTO_TCP)[0]
self._server_socket = socket.socket(family, type, proto)
self._connection = None
self._server_socket.bind(addr)
self._server_socket.listen(1)
def get_connect_address(self):
return "[{}]:{}".format(*self._server_socket.getsockname())
def get_connect_url(self):
return "connect://" + self.get_connect_address()
def close_server(self):
self._server_socket.close()
def accept(self):
assert self._connection is None
# accept() is stubborn and won't fail even when the socket is
# shutdown, so we'll use a timeout
self._server_socket.settimeout(30.0)
client, client_addr = self._server_socket.accept()
# The connected client inherits its timeout from self._socket,
# but we'll use a blocking socket for the client
client.settimeout(None)
self._connection = client
def close_connection(self):
assert self._connection is not None
self._connection.close()
self._connection = None
def recv(self):
assert self._connection is not None
return self._connection.recv(4096)
def sendall(self, data):
assert self._connection is not None
return self._connection.sendall(data)
class PtyServerSocket(ServerSocket):
def __init__(self):
master, slave = pty.openpty()
tty.setraw(master)
self._master = io.FileIO(master, 'r+b')
self._slave = io.FileIO(slave, 'r+b')
def get_connect_address(self):
libc = ctypes.CDLL(None)
libc.ptsname.argtypes = (ctypes.c_int,)
libc.ptsname.restype = ctypes.c_char_p
return libc.ptsname(self._master.fileno()).decode()
def get_connect_url(self):
return "file://" + self.get_connect_address()
def close_server(self):
self._slave.close()
self._master.close()
def recv(self):
try:
return self._master.read(4096)
except OSError as e:
# closing the pty results in EIO on Linux, convert it to EOF
if e.errno == errno.EIO:
return b''
raise
def sendall(self, data):
return self._master.write(data)
class MockGDBServer:
"""
A simple TCP-based GDB server that can test client behavior by receiving
@ -343,48 +451,37 @@ class MockGDBServer:
responder = None
_socket = None
_client = None
_thread = None
_receivedData = None
_receivedDataOffset = None
_shouldSendAck = True
def __init__(self):
def __init__(self, socket_class):
self._socket_class = socket_class
self.responder = MockGDBServerResponder()
def start(self):
family, type, proto, _, addr = socket.getaddrinfo("localhost", 0,
proto=socket.IPPROTO_TCP)[0]
self._socket = socket.socket(family, type, proto)
self._socket.bind(addr)
self._socket.listen(1)
self._socket = self._socket_class()
# Start a thread that waits for a client connection.
self._thread = threading.Thread(target=self._run)
self._thread.start()
def stop(self):
self._socket.close()
self._socket.close_server()
self._thread.join()
self._thread = None
def get_connect_address(self):
return "[{}]:{}".format(*self._socket.getsockname())
return self._socket.get_connect_address()
def get_connect_url(self):
return self._socket.get_connect_url()
def _run(self):
# For testing purposes, we only need to worry about one client
# connecting just one time.
try:
# accept() is stubborn and won't fail even when the socket is
# shutdown, so we'll use a timeout
self._socket.settimeout(30.0)
client, client_addr = self._socket.accept()
self._client = client
# The connected client inherits its timeout from self._socket,
# but we'll use a blocking socket for the client
self._client.settimeout(None)
self._socket.accept()
except:
return
self._shouldSendAck = True
@ -393,14 +490,14 @@ class MockGDBServer:
data = None
while True:
try:
data = seven.bitcast_to_string(self._client.recv(4096))
data = seven.bitcast_to_string(self._socket.recv())
if data is None or len(data) == 0:
break
self._receive(data)
except Exception as e:
print("An exception happened when receiving the response from the gdb server. Closing the client...")
traceback.print_exc()
self._client.close()
self._socket.close_connection()
break
def _receive(self, data):
@ -415,7 +512,7 @@ class MockGDBServer:
self._handlePacket(packet)
packet = self._parsePacket()
except self.InvalidPacketException:
self._client.close()
self._socket.close_connection()
def _parsePacket(self):
"""
@ -492,7 +589,7 @@ class MockGDBServer:
# We'll handle the ack stuff here since it's not something any of the
# tests will be concerned about, and it'll get turned off quickly anyway.
if self._shouldSendAck:
self._client.sendall(seven.bitcast_to_bytes('+'))
self._socket.sendall(seven.bitcast_to_bytes('+'))
if packet == "QStartNoAckMode":
self._shouldSendAck = False
response = "OK"
@ -502,7 +599,7 @@ class MockGDBServer:
# Handle packet framing since we don't want to bother tests with it.
if response is not None:
framed = frame_packet(response)
self._client.sendall(seven.bitcast_to_bytes(framed))
self._socket.sendall(seven.bitcast_to_bytes(framed))
PACKET_ACK = object()
PACKET_INTERRUPT = object()
@ -510,6 +607,7 @@ class MockGDBServer:
class InvalidPacketException(Exception):
pass
class GDBRemoteTestBase(TestBase):
"""
Base class for GDB client tests.
@ -522,10 +620,11 @@ class GDBRemoteTestBase(TestBase):
NO_DEBUG_INFO_TESTCASE = True
mydir = TestBase.compute_mydir(__file__)
server = None
server_socket_class = TCPServerSocket
def setUp(self):
TestBase.setUp(self)
self.server = MockGDBServer()
self.server = MockGDBServer(socket_class=self.server_socket_class)
self.server.start()
def tearDown(self):
@ -559,7 +658,7 @@ class GDBRemoteTestBase(TestBase):
listener = self.dbg.GetListener()
error = lldb.SBError()
process = target.ConnectRemote(listener,
"connect://" + self.server.get_connect_address(), "gdb-remote", error)
self.server.get_connect_url(), "gdb-remote", error)
self.assertTrue(error.Success(), error.description)
self.assertTrue(process, PROCESS_IS_VALID)
return process
@ -599,8 +698,7 @@ class GDBPlatformClientTestBase(GDBRemoteTestBase):
def setUp(self):
super().setUp()
self.runCmd("platform select remote-gdb-server")
self.runCmd("platform connect connect://" +
self.server.get_connect_address())
self.runCmd("platform connect " + self.server.get_connect_url())
self.assertTrue(self.dbg.GetSelectedPlatform().IsConnected())
def tearDown(self):