forked from OSchip/llvm-project
1638 lines
63 KiB
Python
1638 lines
63 KiB
Python
"""
|
|
Base class for gdb-remote test cases.
|
|
"""
|
|
|
|
from __future__ import print_function
|
|
|
|
|
|
import errno
|
|
import os
|
|
import os.path
|
|
import platform
|
|
import random
|
|
import re
|
|
import select
|
|
import signal
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from lldbsuite.test import configuration
|
|
from lldbsuite.test.lldbtest import *
|
|
from lldbgdbserverutils import *
|
|
import logging
|
|
|
|
|
|
class _ConnectionRefused(IOError):
|
|
pass
|
|
|
|
|
|
class GdbRemoteTestCaseBase(TestBase):
|
|
|
|
NO_DEBUG_INFO_TESTCASE = True
|
|
|
|
_TIMEOUT_SECONDS = 120
|
|
|
|
_GDBREMOTE_KILL_PACKET = "$k#6b"
|
|
|
|
# Start the inferior separately, attach to the inferior on the stub
|
|
# command line.
|
|
_STARTUP_ATTACH = "attach"
|
|
# Start the inferior separately, start the stub without attaching, allow
|
|
# the test to attach to the inferior however it wants (e.g. $vAttach;pid).
|
|
_STARTUP_ATTACH_MANUALLY = "attach_manually"
|
|
# Start the stub, and launch the inferior with an $A packet via the
|
|
# initial packet stream.
|
|
_STARTUP_LAUNCH = "launch"
|
|
|
|
# GDB Signal numbers that are not target-specific used for common
|
|
# exceptions
|
|
TARGET_EXC_BAD_ACCESS = 0x91
|
|
TARGET_EXC_BAD_INSTRUCTION = 0x92
|
|
TARGET_EXC_ARITHMETIC = 0x93
|
|
TARGET_EXC_EMULATION = 0x94
|
|
TARGET_EXC_SOFTWARE = 0x95
|
|
TARGET_EXC_BREAKPOINT = 0x96
|
|
|
|
_verbose_log_handler = None
|
|
_log_formatter = logging.Formatter(
|
|
fmt='%(asctime)-15s %(levelname)-8s %(message)s')
|
|
|
|
def setUpBaseLogging(self):
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
if len(self.logger.handlers) > 0:
|
|
return # We have set up this handler already
|
|
|
|
self.logger.propagate = False
|
|
self.logger.setLevel(logging.DEBUG)
|
|
|
|
# log all warnings to stderr
|
|
handler = logging.StreamHandler()
|
|
handler.setLevel(logging.WARNING)
|
|
handler.setFormatter(self._log_formatter)
|
|
self.logger.addHandler(handler)
|
|
|
|
def isVerboseLoggingRequested(self):
|
|
# We will report our detailed logs if the user requested that the "gdb-remote" channel is
|
|
# logged.
|
|
return any(("gdb-remote" in channel)
|
|
for channel in lldbtest_config.channels)
|
|
|
|
def setUp(self):
|
|
TestBase.setUp(self)
|
|
|
|
self.setUpBaseLogging()
|
|
self.debug_monitor_extra_args = []
|
|
self._pump_queues = socket_packet_pump.PumpQueues()
|
|
|
|
if self.isVerboseLoggingRequested():
|
|
# If requested, full logs go to a log file
|
|
self._verbose_log_handler = logging.FileHandler(
|
|
self.log_basename + "-host.log")
|
|
self._verbose_log_handler.setFormatter(self._log_formatter)
|
|
self._verbose_log_handler.setLevel(logging.DEBUG)
|
|
self.logger.addHandler(self._verbose_log_handler)
|
|
|
|
self.test_sequence = GdbRemoteTestSequence(self.logger)
|
|
self.set_inferior_startup_launch()
|
|
self.port = self.get_next_port()
|
|
self.named_pipe_path = None
|
|
self.named_pipe = None
|
|
self.named_pipe_fd = None
|
|
self.stub_sends_two_stop_notifications_on_kill = False
|
|
if configuration.lldb_platform_url:
|
|
if configuration.lldb_platform_url.startswith('unix-'):
|
|
url_pattern = '(.+)://\[?(.+?)\]?/.*'
|
|
else:
|
|
url_pattern = '(.+)://(.+):\d+'
|
|
scheme, host = re.match(
|
|
url_pattern, configuration.lldb_platform_url).groups()
|
|
if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
|
|
self.stub_device = host
|
|
self.stub_hostname = 'localhost'
|
|
else:
|
|
self.stub_device = None
|
|
self.stub_hostname = host
|
|
else:
|
|
self.stub_hostname = "localhost"
|
|
|
|
def tearDown(self):
|
|
self._pump_queues.verify_queues_empty()
|
|
|
|
self.logger.removeHandler(self._verbose_log_handler)
|
|
self._verbose_log_handler = None
|
|
TestBase.tearDown(self)
|
|
|
|
def getLocalServerLogFile(self):
|
|
return self.log_basename + "-server.log"
|
|
|
|
def setUpServerLogging(self, is_llgs):
|
|
if len(lldbtest_config.channels) == 0:
|
|
return # No logging requested
|
|
|
|
if lldb.remote_platform:
|
|
log_file = lldbutil.join_remote_paths(
|
|
lldb.remote_platform.GetWorkingDirectory(), "server.log")
|
|
else:
|
|
log_file = self.getLocalServerLogFile()
|
|
|
|
if is_llgs:
|
|
self.debug_monitor_extra_args.append("--log-file=" + log_file)
|
|
self.debug_monitor_extra_args.append(
|
|
"--log-channels={}".format(":".join(lldbtest_config.channels)))
|
|
else:
|
|
self.debug_monitor_extra_args = [
|
|
"--log-file=" + log_file, "--log-flags=0x800000"]
|
|
|
|
def get_next_port(self):
|
|
return 12000 + random.randint(0, 3999)
|
|
|
|
def reset_test_sequence(self):
|
|
self.test_sequence = GdbRemoteTestSequence(self.logger)
|
|
|
|
def create_named_pipe(self):
|
|
# Create a temp dir and name for a pipe.
|
|
temp_dir = tempfile.mkdtemp()
|
|
named_pipe_path = os.path.join(temp_dir, "stub_port_number")
|
|
|
|
# Create the named pipe.
|
|
os.mkfifo(named_pipe_path)
|
|
|
|
# Open the read side of the pipe in non-blocking mode. This will
|
|
# return right away, ready or not.
|
|
named_pipe_fd = os.open(named_pipe_path, os.O_RDONLY | os.O_NONBLOCK)
|
|
|
|
# Create the file for the named pipe. Note this will follow semantics of
|
|
# a non-blocking read side of a named pipe, which has different semantics
|
|
# than a named pipe opened for read in non-blocking mode.
|
|
named_pipe = os.fdopen(named_pipe_fd, "r")
|
|
self.assertIsNotNone(named_pipe)
|
|
|
|
def shutdown_named_pipe():
|
|
# Close the pipe.
|
|
try:
|
|
named_pipe.close()
|
|
except:
|
|
print("failed to close named pipe")
|
|
None
|
|
|
|
# Delete the pipe.
|
|
try:
|
|
os.remove(named_pipe_path)
|
|
except:
|
|
print("failed to delete named pipe: {}".format(named_pipe_path))
|
|
None
|
|
|
|
# Delete the temp directory.
|
|
try:
|
|
os.rmdir(temp_dir)
|
|
except:
|
|
print(
|
|
"failed to delete temp dir: {}, directory contents: '{}'".format(
|
|
temp_dir, os.listdir(temp_dir)))
|
|
None
|
|
|
|
# Add the shutdown hook to clean up the named pipe.
|
|
self.addTearDownHook(shutdown_named_pipe)
|
|
|
|
# Clear the port so the stub selects a port number.
|
|
self.port = 0
|
|
|
|
return (named_pipe_path, named_pipe, named_pipe_fd)
|
|
|
|
def get_stub_port_from_named_socket(self, read_timeout_seconds=5):
|
|
# Wait for something to read with a max timeout.
|
|
(ready_readers, _, _) = select.select(
|
|
[self.named_pipe_fd], [], [], read_timeout_seconds)
|
|
self.assertIsNotNone(
|
|
ready_readers,
|
|
"write side of pipe has not written anything - stub isn't writing to pipe.")
|
|
self.assertNotEqual(
|
|
len(ready_readers),
|
|
0,
|
|
"write side of pipe has not written anything - stub isn't writing to pipe.")
|
|
|
|
# Read the port from the named pipe.
|
|
stub_port_raw = self.named_pipe.read()
|
|
self.assertIsNotNone(stub_port_raw)
|
|
self.assertNotEqual(
|
|
len(stub_port_raw),
|
|
0,
|
|
"no content to read on pipe")
|
|
|
|
# Trim null byte, convert to int.
|
|
stub_port_raw = stub_port_raw[:-1]
|
|
stub_port = int(stub_port_raw)
|
|
self.assertTrue(stub_port > 0)
|
|
|
|
return stub_port
|
|
|
|
def init_llgs_test(self, use_named_pipe=True):
|
|
if lldb.remote_platform:
|
|
# Remote platforms don't support named pipe based port negotiation
|
|
use_named_pipe = False
|
|
|
|
# Grab the ppid from /proc/[shell pid]/stat
|
|
err, retcode, shell_stat = self.run_platform_command(
|
|
"cat /proc/$$/stat")
|
|
self.assertTrue(
|
|
err.Success() and retcode == 0,
|
|
"Failed to read file /proc/$$/stat: %s, retcode: %d" %
|
|
(err.GetCString(),
|
|
retcode))
|
|
|
|
# [pid] ([executable]) [state] [*ppid*]
|
|
pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
|
|
err, retcode, ls_output = self.run_platform_command(
|
|
"ls -l /proc/%s/exe" % pid)
|
|
self.assertTrue(
|
|
err.Success() and retcode == 0,
|
|
"Failed to read file /proc/%s/exe: %s, retcode: %d" %
|
|
(pid,
|
|
err.GetCString(),
|
|
retcode))
|
|
exe = ls_output.split()[-1]
|
|
|
|
# If the binary has been deleted, the link name has " (deleted)" appended.
|
|
# Remove if it's there.
|
|
self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe)
|
|
else:
|
|
self.debug_monitor_exe = get_lldb_server_exe()
|
|
if not self.debug_monitor_exe:
|
|
self.skipTest("lldb-server exe not found")
|
|
|
|
self.debug_monitor_extra_args = ["gdbserver"]
|
|
self.setUpServerLogging(is_llgs=True)
|
|
|
|
if use_named_pipe:
|
|
(self.named_pipe_path, self.named_pipe,
|
|
self.named_pipe_fd) = self.create_named_pipe()
|
|
|
|
def init_debugserver_test(self, use_named_pipe=True):
|
|
self.debug_monitor_exe = get_debugserver_exe()
|
|
if not self.debug_monitor_exe:
|
|
self.skipTest("debugserver exe not found")
|
|
self.setUpServerLogging(is_llgs=False)
|
|
if use_named_pipe:
|
|
(self.named_pipe_path, self.named_pipe,
|
|
self.named_pipe_fd) = self.create_named_pipe()
|
|
# The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
|
|
# when the process truly dies.
|
|
self.stub_sends_two_stop_notifications_on_kill = True
|
|
|
|
def forward_adb_port(self, source, target, direction, device):
|
|
adb = ['adb'] + (['-s', device] if device else []) + [direction]
|
|
|
|
def remove_port_forward():
|
|
subprocess.call(adb + ["--remove", "tcp:%d" % source])
|
|
|
|
subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target])
|
|
self.addTearDownHook(remove_port_forward)
|
|
|
|
def _verify_socket(self, sock):
|
|
# Normally, when the remote stub is not ready, we will get ECONNREFUSED during the
|
|
# connect() attempt. However, due to the way how ADB forwarding works, on android targets
|
|
# the connect() will always be successful, but the connection will be immediately dropped
|
|
# if ADB could not connect on the remote side. This function tries to detect this
|
|
# situation, and report it as "connection refused" so that the upper layers attempt the
|
|
# connection again.
|
|
triple = self.dbg.GetSelectedPlatform().GetTriple()
|
|
if not re.match(".*-.*-.*-android", triple):
|
|
return # Not android.
|
|
can_read, _, _ = select.select([sock], [], [], 0.1)
|
|
if sock not in can_read:
|
|
return # Data is not available, but the connection is alive.
|
|
if len(sock.recv(1, socket.MSG_PEEK)) == 0:
|
|
raise _ConnectionRefused() # Got EOF, connection dropped.
|
|
|
|
def create_socket(self):
|
|
sock = socket.socket()
|
|
logger = self.logger
|
|
|
|
triple = self.dbg.GetSelectedPlatform().GetTriple()
|
|
if re.match(".*-.*-.*-android", triple):
|
|
self.forward_adb_port(
|
|
self.port,
|
|
self.port,
|
|
"forward",
|
|
self.stub_device)
|
|
|
|
logger.info(
|
|
"Connecting to debug monitor on %s:%d",
|
|
self.stub_hostname,
|
|
self.port)
|
|
connect_info = (self.stub_hostname, self.port)
|
|
try:
|
|
sock.connect(connect_info)
|
|
except socket.error as serr:
|
|
if serr.errno == errno.ECONNREFUSED:
|
|
raise _ConnectionRefused()
|
|
raise serr
|
|
|
|
def shutdown_socket():
|
|
if sock:
|
|
try:
|
|
# send the kill packet so lldb-server shuts down gracefully
|
|
sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
|
|
except:
|
|
logger.warning(
|
|
"failed to send kill packet to debug monitor: {}; ignoring".format(
|
|
sys.exc_info()[0]))
|
|
|
|
try:
|
|
sock.close()
|
|
except:
|
|
logger.warning(
|
|
"failed to close socket to debug monitor: {}; ignoring".format(
|
|
sys.exc_info()[0]))
|
|
|
|
self.addTearDownHook(shutdown_socket)
|
|
|
|
self._verify_socket(sock)
|
|
|
|
return sock
|
|
|
|
def set_inferior_startup_launch(self):
|
|
self._inferior_startup = self._STARTUP_LAUNCH
|
|
|
|
def set_inferior_startup_attach(self):
|
|
self._inferior_startup = self._STARTUP_ATTACH
|
|
|
|
def set_inferior_startup_attach_manually(self):
|
|
self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
|
|
|
|
def get_debug_monitor_command_line_args(self, attach_pid=None):
|
|
if lldb.remote_platform:
|
|
commandline_args = self.debug_monitor_extra_args + \
|
|
["*:{}".format(self.port)]
|
|
else:
|
|
commandline_args = self.debug_monitor_extra_args + \
|
|
["127.0.0.1:{}".format(self.port)]
|
|
|
|
if attach_pid:
|
|
commandline_args += ["--attach=%d" % attach_pid]
|
|
if self.named_pipe_path:
|
|
commandline_args += ["--named-pipe", self.named_pipe_path]
|
|
return commandline_args
|
|
|
|
def launch_debug_monitor(self, attach_pid=None, logfile=None):
|
|
# Create the command line.
|
|
commandline_args = self.get_debug_monitor_command_line_args(
|
|
attach_pid=attach_pid)
|
|
|
|
# Start the server.
|
|
server = self.spawnSubprocess(
|
|
self.debug_monitor_exe,
|
|
commandline_args,
|
|
install_remote=False)
|
|
self.addTearDownHook(self.cleanupSubprocesses)
|
|
self.assertIsNotNone(server)
|
|
|
|
# If we're receiving the stub's listening port from the named pipe, do
|
|
# that here.
|
|
if self.named_pipe:
|
|
self.port = self.get_stub_port_from_named_socket()
|
|
|
|
return server
|
|
|
|
def connect_to_debug_monitor(self, attach_pid=None):
|
|
if self.named_pipe:
|
|
# Create the stub.
|
|
server = self.launch_debug_monitor(attach_pid=attach_pid)
|
|
self.assertIsNotNone(server)
|
|
|
|
def shutdown_debug_monitor():
|
|
try:
|
|
server.terminate()
|
|
except:
|
|
logger.warning(
|
|
"failed to terminate server for debug monitor: {}; ignoring".format(
|
|
sys.exc_info()[0]))
|
|
self.addTearDownHook(shutdown_debug_monitor)
|
|
|
|
# Schedule debug monitor to be shut down during teardown.
|
|
logger = self.logger
|
|
|
|
# Attach to the stub and return a socket opened to it.
|
|
self.sock = self.create_socket()
|
|
return server
|
|
|
|
# We're using a random port algorithm to try not to collide with other ports,
|
|
# and retry a max # times.
|
|
attempts = 0
|
|
MAX_ATTEMPTS = 20
|
|
|
|
while attempts < MAX_ATTEMPTS:
|
|
server = self.launch_debug_monitor(attach_pid=attach_pid)
|
|
|
|
# Schedule debug monitor to be shut down during teardown.
|
|
logger = self.logger
|
|
|
|
def shutdown_debug_monitor():
|
|
try:
|
|
server.terminate()
|
|
except:
|
|
logger.warning(
|
|
"failed to terminate server for debug monitor: {}; ignoring".format(
|
|
sys.exc_info()[0]))
|
|
self.addTearDownHook(shutdown_debug_monitor)
|
|
|
|
connect_attemps = 0
|
|
MAX_CONNECT_ATTEMPTS = 10
|
|
|
|
while connect_attemps < MAX_CONNECT_ATTEMPTS:
|
|
# Create a socket to talk to the server
|
|
try:
|
|
logger.info("Connect attempt %d", connect_attemps + 1)
|
|
self.sock = self.create_socket()
|
|
return server
|
|
except _ConnectionRefused as serr:
|
|
# Ignore, and try again.
|
|
pass
|
|
time.sleep(0.5)
|
|
connect_attemps += 1
|
|
|
|
# We should close the server here to be safe.
|
|
server.terminate()
|
|
|
|
# Increment attempts.
|
|
print(
|
|
"connect to debug monitor on port %d failed, attempt #%d of %d" %
|
|
(self.port, attempts + 1, MAX_ATTEMPTS))
|
|
attempts += 1
|
|
|
|
# And wait a random length of time before next attempt, to avoid
|
|
# collisions.
|
|
time.sleep(random.randint(1, 5))
|
|
|
|
# Now grab a new port number.
|
|
self.port = self.get_next_port()
|
|
|
|
raise Exception(
|
|
"failed to create a socket to the launched debug monitor after %d tries" %
|
|
attempts)
|
|
|
|
def launch_process_for_attach(
|
|
self,
|
|
inferior_args=None,
|
|
sleep_seconds=3,
|
|
exe_path=None):
|
|
# We're going to start a child process that the debug monitor stub can later attach to.
|
|
# This process needs to be started so that it just hangs around for a while. We'll
|
|
# have it sleep.
|
|
if not exe_path:
|
|
exe_path = self.getBuildArtifact("a.out")
|
|
|
|
args = []
|
|
if inferior_args:
|
|
args.extend(inferior_args)
|
|
if sleep_seconds:
|
|
args.append("sleep:%d" % sleep_seconds)
|
|
|
|
inferior = self.spawnSubprocess(exe_path, args)
|
|
|
|
def shutdown_process_for_attach():
|
|
try:
|
|
inferior.terminate()
|
|
except:
|
|
logger.warning(
|
|
"failed to terminate inferior process for attach: {}; ignoring".format(
|
|
sys.exc_info()[0]))
|
|
self.addTearDownHook(shutdown_process_for_attach)
|
|
return inferior
|
|
|
|
def prep_debug_monitor_and_inferior(
|
|
self,
|
|
inferior_args=None,
|
|
inferior_sleep_seconds=3,
|
|
inferior_exe_path=None):
|
|
"""Prep the debug monitor, the inferior, and the expected packet stream.
|
|
|
|
Handle the separate cases of using the debug monitor in attach-to-inferior mode
|
|
and in launch-inferior mode.
|
|
|
|
For attach-to-inferior mode, the inferior process is first started, then
|
|
the debug monitor is started in attach to pid mode (using --attach on the
|
|
stub command line), and the no-ack-mode setup is appended to the packet
|
|
stream. The packet stream is not yet executed, ready to have more expected
|
|
packet entries added to it.
|
|
|
|
For launch-inferior mode, the stub is first started, then no ack mode is
|
|
setup on the expected packet stream, then the verified launch packets are added
|
|
to the expected socket stream. The packet stream is not yet executed, ready
|
|
to have more expected packet entries added to it.
|
|
|
|
The return value is:
|
|
{inferior:<inferior>, server:<server>}
|
|
"""
|
|
inferior = None
|
|
attach_pid = None
|
|
|
|
if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
|
|
# Launch the process that we'll use as the inferior.
|
|
inferior = self.launch_process_for_attach(
|
|
inferior_args=inferior_args,
|
|
sleep_seconds=inferior_sleep_seconds,
|
|
exe_path=inferior_exe_path)
|
|
self.assertIsNotNone(inferior)
|
|
self.assertTrue(inferior.pid > 0)
|
|
if self._inferior_startup == self._STARTUP_ATTACH:
|
|
# In this case, we want the stub to attach via the command
|
|
# line, so set the command line attach pid here.
|
|
attach_pid = inferior.pid
|
|
|
|
if self._inferior_startup == self._STARTUP_LAUNCH:
|
|
# Build launch args
|
|
if not inferior_exe_path:
|
|
inferior_exe_path = self.getBuildArtifact("a.out")
|
|
|
|
if lldb.remote_platform:
|
|
remote_path = lldbutil.append_to_process_working_directory(self,
|
|
os.path.basename(inferior_exe_path))
|
|
remote_file_spec = lldb.SBFileSpec(remote_path, False)
|
|
err = lldb.remote_platform.Install(lldb.SBFileSpec(
|
|
inferior_exe_path, True), remote_file_spec)
|
|
if err.Fail():
|
|
raise Exception(
|
|
"remote_platform.Install('%s', '%s') failed: %s" %
|
|
(inferior_exe_path, remote_path, err))
|
|
inferior_exe_path = remote_path
|
|
|
|
launch_args = [inferior_exe_path]
|
|
if inferior_args:
|
|
launch_args.extend(inferior_args)
|
|
|
|
# Launch the debug monitor stub, attaching to the inferior.
|
|
server = self.connect_to_debug_monitor(attach_pid=attach_pid)
|
|
self.assertIsNotNone(server)
|
|
|
|
# Build the expected protocol stream
|
|
self.add_no_ack_remote_stream()
|
|
if self._inferior_startup == self._STARTUP_LAUNCH:
|
|
self.add_verified_launch_packets(launch_args)
|
|
|
|
return {"inferior": inferior, "server": server}
|
|
|
|
def expect_socket_recv(
|
|
self,
|
|
sock,
|
|
expected_content_regex,
|
|
timeout_seconds):
|
|
response = ""
|
|
timeout_time = time.time() + timeout_seconds
|
|
|
|
while not expected_content_regex.match(
|
|
response) and time.time() < timeout_time:
|
|
can_read, _, _ = select.select([sock], [], [], timeout_seconds)
|
|
if can_read and sock in can_read:
|
|
recv_bytes = sock.recv(4096)
|
|
if recv_bytes:
|
|
response += recv_bytes
|
|
|
|
self.assertTrue(expected_content_regex.match(response))
|
|
|
|
def expect_socket_send(self, sock, content, timeout_seconds):
|
|
request_bytes_remaining = content
|
|
timeout_time = time.time() + timeout_seconds
|
|
|
|
while len(request_bytes_remaining) > 0 and time.time() < timeout_time:
|
|
_, can_write, _ = select.select([], [sock], [], timeout_seconds)
|
|
if can_write and sock in can_write:
|
|
written_byte_count = sock.send(request_bytes_remaining)
|
|
request_bytes_remaining = request_bytes_remaining[
|
|
written_byte_count:]
|
|
self.assertEqual(len(request_bytes_remaining), 0)
|
|
|
|
def do_handshake(self, stub_socket, timeout_seconds=5):
|
|
# Write the ack.
|
|
self.expect_socket_send(stub_socket, "+", timeout_seconds)
|
|
|
|
# Send the start no ack mode packet.
|
|
NO_ACK_MODE_REQUEST = "$QStartNoAckMode#b0"
|
|
bytes_sent = stub_socket.send(NO_ACK_MODE_REQUEST)
|
|
self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST))
|
|
|
|
# Receive the ack and "OK"
|
|
self.expect_socket_recv(stub_socket, re.compile(
|
|
r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds)
|
|
|
|
# Send the final ack.
|
|
self.expect_socket_send(stub_socket, "+", timeout_seconds)
|
|
|
|
def add_no_ack_remote_stream(self):
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: +",
|
|
"read packet: $QStartNoAckMode#b0",
|
|
"send packet: +",
|
|
"send packet: $OK#9a",
|
|
"read packet: +"],
|
|
True)
|
|
|
|
def add_verified_launch_packets(self, launch_args):
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: %s" % build_gdbremote_A_packet(launch_args),
|
|
"send packet: $OK#00",
|
|
"read packet: $qLaunchSuccess#a5",
|
|
"send packet: $OK#00"],
|
|
True)
|
|
|
|
def add_thread_suffix_request_packets(self):
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $QThreadSuffixSupported#e4",
|
|
"send packet: $OK#00",
|
|
], True)
|
|
|
|
def add_process_info_collection_packets(self):
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $qProcessInfo#dc",
|
|
{"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}],
|
|
True)
|
|
|
|
_KNOWN_PROCESS_INFO_KEYS = [
|
|
"pid",
|
|
"parent-pid",
|
|
"real-uid",
|
|
"real-gid",
|
|
"effective-uid",
|
|
"effective-gid",
|
|
"cputype",
|
|
"cpusubtype",
|
|
"ostype",
|
|
"triple",
|
|
"vendor",
|
|
"endian",
|
|
"elf_abi",
|
|
"ptrsize"
|
|
]
|
|
|
|
def parse_process_info_response(self, context):
|
|
# Ensure we have a process info response.
|
|
self.assertIsNotNone(context)
|
|
process_info_raw = context.get("process_info_raw")
|
|
self.assertIsNotNone(process_info_raw)
|
|
|
|
# Pull out key:value; pairs.
|
|
process_info_dict = {
|
|
match.group(1): match.group(2) for match in re.finditer(
|
|
r"([^:]+):([^;]+);", process_info_raw)}
|
|
|
|
# Validate keys are known.
|
|
for (key, val) in list(process_info_dict.items()):
|
|
self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
|
|
self.assertIsNotNone(val)
|
|
|
|
return process_info_dict
|
|
|
|
def add_register_info_collection_packets(self):
|
|
self.test_sequence.add_log_lines(
|
|
[{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True,
|
|
"end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
|
|
"save_key": "reg_info_responses"}],
|
|
True)
|
|
|
|
def parse_register_info_packets(self, context):
|
|
"""Return an array of register info dictionaries, one per register info."""
|
|
reg_info_responses = context.get("reg_info_responses")
|
|
self.assertIsNotNone(reg_info_responses)
|
|
|
|
# Parse register infos.
|
|
return [parse_reg_info_response(reg_info_response)
|
|
for reg_info_response in reg_info_responses]
|
|
|
|
def expect_gdbremote_sequence(self, timeout_seconds=None):
|
|
if not timeout_seconds:
|
|
timeout_seconds = self._TIMEOUT_SECONDS
|
|
return expect_lldb_gdbserver_replay(
|
|
self,
|
|
self.sock,
|
|
self.test_sequence,
|
|
self._pump_queues,
|
|
timeout_seconds,
|
|
self.logger)
|
|
|
|
_KNOWN_REGINFO_KEYS = [
|
|
"name",
|
|
"alt-name",
|
|
"bitsize",
|
|
"offset",
|
|
"encoding",
|
|
"format",
|
|
"set",
|
|
"gcc",
|
|
"ehframe",
|
|
"dwarf",
|
|
"generic",
|
|
"container-regs",
|
|
"invalidate-regs",
|
|
"dynamic_size_dwarf_expr_bytes",
|
|
"dynamic_size_dwarf_len"
|
|
]
|
|
|
|
def assert_valid_reg_info(self, reg_info):
|
|
# Assert we know about all the reginfo keys parsed.
|
|
for key in reg_info:
|
|
self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
|
|
|
|
# Check the bare-minimum expected set of register info keys.
|
|
self.assertTrue("name" in reg_info)
|
|
self.assertTrue("bitsize" in reg_info)
|
|
self.assertTrue("offset" in reg_info)
|
|
self.assertTrue("encoding" in reg_info)
|
|
self.assertTrue("format" in reg_info)
|
|
|
|
def find_pc_reg_info(self, reg_infos):
|
|
lldb_reg_index = 0
|
|
for reg_info in reg_infos:
|
|
if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
|
|
return (lldb_reg_index, reg_info)
|
|
lldb_reg_index += 1
|
|
|
|
return (None, None)
|
|
|
|
def add_lldb_register_index(self, reg_infos):
|
|
"""Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
|
|
|
|
We'll use this when we want to call packets like P/p with a register index but do so
|
|
on only a subset of the full register info set.
|
|
"""
|
|
self.assertIsNotNone(reg_infos)
|
|
|
|
reg_index = 0
|
|
for reg_info in reg_infos:
|
|
reg_info["lldb_register_index"] = reg_index
|
|
reg_index += 1
|
|
|
|
def add_query_memory_region_packets(self, address):
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
|
|
{"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}],
|
|
True)
|
|
|
|
def parse_key_val_dict(self, key_val_text, allow_dupes=True):
|
|
self.assertIsNotNone(key_val_text)
|
|
kv_dict = {}
|
|
for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
|
|
key = match.group(1)
|
|
val = match.group(2)
|
|
if key in kv_dict:
|
|
if allow_dupes:
|
|
if isinstance(kv_dict[key], list):
|
|
kv_dict[key].append(val)
|
|
else:
|
|
# Promote to list
|
|
kv_dict[key] = [kv_dict[key], val]
|
|
else:
|
|
self.fail(
|
|
"key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(
|
|
key, val, key_val_text, kv_dict))
|
|
else:
|
|
kv_dict[key] = val
|
|
return kv_dict
|
|
|
|
def parse_memory_region_packet(self, context):
|
|
# Ensure we have a context.
|
|
self.assertIsNotNone(context.get("memory_region_response"))
|
|
|
|
# Pull out key:value; pairs.
|
|
mem_region_dict = self.parse_key_val_dict(
|
|
context.get("memory_region_response"))
|
|
|
|
# Validate keys are known.
|
|
for (key, val) in list(mem_region_dict.items()):
|
|
self.assertTrue(
|
|
key in [
|
|
"start",
|
|
"size",
|
|
"permissions",
|
|
"name",
|
|
"error"])
|
|
self.assertIsNotNone(val)
|
|
|
|
# Return the dictionary of key-value pairs for the memory region.
|
|
return mem_region_dict
|
|
|
|
def assert_address_within_memory_region(
|
|
self, test_address, mem_region_dict):
|
|
self.assertIsNotNone(mem_region_dict)
|
|
self.assertTrue("start" in mem_region_dict)
|
|
self.assertTrue("size" in mem_region_dict)
|
|
|
|
range_start = int(mem_region_dict["start"], 16)
|
|
range_size = int(mem_region_dict["size"], 16)
|
|
range_end = range_start + range_size
|
|
|
|
if test_address < range_start:
|
|
self.fail(
|
|
"address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
|
|
test_address,
|
|
range_start,
|
|
range_end,
|
|
range_size))
|
|
elif test_address >= range_end:
|
|
self.fail(
|
|
"address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
|
|
test_address,
|
|
range_start,
|
|
range_end,
|
|
range_size))
|
|
|
|
def add_threadinfo_collection_packets(self):
|
|
self.test_sequence.add_log_lines(
|
|
[{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo",
|
|
"append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
|
|
"save_key": "threadinfo_responses"}],
|
|
True)
|
|
|
|
def parse_threadinfo_packets(self, context):
|
|
"""Return an array of thread ids (decimal ints), one per thread."""
|
|
threadinfo_responses = context.get("threadinfo_responses")
|
|
self.assertIsNotNone(threadinfo_responses)
|
|
|
|
thread_ids = []
|
|
for threadinfo_response in threadinfo_responses:
|
|
new_thread_infos = parse_threadinfo_response(threadinfo_response)
|
|
thread_ids.extend(new_thread_infos)
|
|
return thread_ids
|
|
|
|
def wait_for_thread_count(self, thread_count, timeout_seconds=3):
|
|
start_time = time.time()
|
|
timeout_time = start_time + timeout_seconds
|
|
|
|
actual_thread_count = 0
|
|
while actual_thread_count < thread_count:
|
|
self.reset_test_sequence()
|
|
self.add_threadinfo_collection_packets()
|
|
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
|
|
threads = self.parse_threadinfo_packets(context)
|
|
self.assertIsNotNone(threads)
|
|
|
|
actual_thread_count = len(threads)
|
|
|
|
if time.time() > timeout_time:
|
|
raise Exception(
|
|
'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
|
|
timeout_seconds, thread_count, actual_thread_count))
|
|
|
|
return threads
|
|
|
|
def add_set_breakpoint_packets(
|
|
self,
|
|
address,
|
|
z_packet_type=0,
|
|
do_continue=True,
|
|
breakpoint_kind=1):
|
|
self.test_sequence.add_log_lines(
|
|
[ # Set the breakpoint.
|
|
"read packet: $Z{2},{0:x},{1}#00".format(
|
|
address, breakpoint_kind, z_packet_type),
|
|
# Verify the stub could set it.
|
|
"send packet: $OK#00",
|
|
], True)
|
|
|
|
if (do_continue):
|
|
self.test_sequence.add_log_lines(
|
|
[ # Continue the inferior.
|
|
"read packet: $c#63",
|
|
# Expect a breakpoint stop report.
|
|
{"direction": "send",
|
|
"regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
|
|
"capture": {1: "stop_signo",
|
|
2: "stop_thread_id"}},
|
|
], True)
|
|
|
|
def add_remove_breakpoint_packets(
|
|
self,
|
|
address,
|
|
z_packet_type=0,
|
|
breakpoint_kind=1):
|
|
self.test_sequence.add_log_lines(
|
|
[ # Remove the breakpoint.
|
|
"read packet: $z{2},{0:x},{1}#00".format(
|
|
address, breakpoint_kind, z_packet_type),
|
|
# Verify the stub could unset it.
|
|
"send packet: $OK#00",
|
|
], True)
|
|
|
|
def add_qSupported_packets(self):
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $qSupported#00",
|
|
{"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}},
|
|
], True)
|
|
|
|
_KNOWN_QSUPPORTED_STUB_FEATURES = [
|
|
"augmented-libraries-svr4-read",
|
|
"PacketSize",
|
|
"QStartNoAckMode",
|
|
"QThreadSuffixSupported",
|
|
"QListThreadsInStopReply",
|
|
"qXfer:auxv:read",
|
|
"qXfer:libraries:read",
|
|
"qXfer:libraries-svr4:read",
|
|
"qXfer:features:read",
|
|
"qEcho",
|
|
"QPassSignals"
|
|
]
|
|
|
|
def parse_qSupported_response(self, context):
|
|
self.assertIsNotNone(context)
|
|
|
|
raw_response = context.get("qSupported_response")
|
|
self.assertIsNotNone(raw_response)
|
|
|
|
# For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the
|
|
# +,-,? is stripped from the key and set as the value.
|
|
supported_dict = {}
|
|
for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
|
|
key = match.group(1)
|
|
val = match.group(3)
|
|
|
|
# key=val: store as is
|
|
if val and len(val) > 0:
|
|
supported_dict[key] = val
|
|
else:
|
|
if len(key) < 2:
|
|
raise Exception(
|
|
"singular stub feature is too short: must be stub_feature{+,-,?}")
|
|
supported_type = key[-1]
|
|
key = key[:-1]
|
|
if not supported_type in ["+", "-", "?"]:
|
|
raise Exception(
|
|
"malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
|
|
supported_dict[key] = supported_type
|
|
# Ensure we know the supported element
|
|
if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES:
|
|
raise Exception(
|
|
"unknown qSupported stub feature reported: %s" %
|
|
key)
|
|
|
|
return supported_dict
|
|
|
|
def run_process_then_stop(self, run_seconds=1):
|
|
# Tell the stub to continue.
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $vCont;c#a8"],
|
|
True)
|
|
context = self.expect_gdbremote_sequence()
|
|
|
|
# Wait for run_seconds.
|
|
time.sleep(run_seconds)
|
|
|
|
# Send an interrupt, capture a T response.
|
|
self.reset_test_sequence()
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: {}".format(chr(3)),
|
|
{"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}],
|
|
True)
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
self.assertIsNotNone(context.get("stop_result"))
|
|
|
|
return context
|
|
|
|
def select_modifiable_register(self, reg_infos):
|
|
"""Find a register that can be read/written freely."""
|
|
PREFERRED_REGISTER_NAMES = set(["rax", ])
|
|
|
|
# First check for the first register from the preferred register name
|
|
# set.
|
|
alternative_register_index = None
|
|
|
|
self.assertIsNotNone(reg_infos)
|
|
for reg_info in reg_infos:
|
|
if ("name" in reg_info) and (
|
|
reg_info["name"] in PREFERRED_REGISTER_NAMES):
|
|
# We found a preferred register. Use it.
|
|
return reg_info["lldb_register_index"]
|
|
if ("generic" in reg_info) and (reg_info["generic"] == "fp" or
|
|
reg_info["generic"] == "arg1"):
|
|
# A frame pointer or first arg register will do as a
|
|
# register to modify temporarily.
|
|
alternative_register_index = reg_info["lldb_register_index"]
|
|
|
|
# We didn't find a preferred register. Return whatever alternative register
|
|
# we found, if any.
|
|
return alternative_register_index
|
|
|
|
def extract_registers_from_stop_notification(self, stop_key_vals_text):
|
|
self.assertIsNotNone(stop_key_vals_text)
|
|
kv_dict = self.parse_key_val_dict(stop_key_vals_text)
|
|
|
|
registers = {}
|
|
for (key, val) in list(kv_dict.items()):
|
|
if re.match(r"^[0-9a-fA-F]+$", key):
|
|
registers[int(key, 16)] = val
|
|
return registers
|
|
|
|
def gather_register_infos(self):
|
|
self.reset_test_sequence()
|
|
self.add_register_info_collection_packets()
|
|
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
|
|
reg_infos = self.parse_register_info_packets(context)
|
|
self.assertIsNotNone(reg_infos)
|
|
self.add_lldb_register_index(reg_infos)
|
|
|
|
return reg_infos
|
|
|
|
def find_generic_register_with_name(self, reg_infos, generic_name):
|
|
self.assertIsNotNone(reg_infos)
|
|
for reg_info in reg_infos:
|
|
if ("generic" in reg_info) and (
|
|
reg_info["generic"] == generic_name):
|
|
return reg_info
|
|
return None
|
|
|
|
def decode_gdbremote_binary(self, encoded_bytes):
|
|
decoded_bytes = ""
|
|
i = 0
|
|
while i < len(encoded_bytes):
|
|
if encoded_bytes[i] == "}":
|
|
# Handle escaped char.
|
|
self.assertTrue(i + 1 < len(encoded_bytes))
|
|
decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20)
|
|
i += 2
|
|
elif encoded_bytes[i] == "*":
|
|
# Handle run length encoding.
|
|
self.assertTrue(len(decoded_bytes) > 0)
|
|
self.assertTrue(i + 1 < len(encoded_bytes))
|
|
repeat_count = ord(encoded_bytes[i + 1]) - 29
|
|
decoded_bytes += decoded_bytes[-1] * repeat_count
|
|
i += 2
|
|
else:
|
|
decoded_bytes += encoded_bytes[i]
|
|
i += 1
|
|
return decoded_bytes
|
|
|
|
def build_auxv_dict(self, endian, word_size, auxv_data):
|
|
self.assertIsNotNone(endian)
|
|
self.assertIsNotNone(word_size)
|
|
self.assertIsNotNone(auxv_data)
|
|
|
|
auxv_dict = {}
|
|
|
|
# PowerPC64le's auxvec has a special key that must be ignored.
|
|
# This special key may be used multiple times, resulting in
|
|
# multiple key/value pairs with the same key, which would otherwise
|
|
# break this test check for repeated keys.
|
|
#
|
|
# AT_IGNOREPPC = 22
|
|
ignored_keys_for_arch = { 'powerpc64le' : [22] }
|
|
arch = self.getArchitecture()
|
|
ignore_keys = None
|
|
if arch in ignored_keys_for_arch:
|
|
ignore_keys = ignored_keys_for_arch[arch]
|
|
|
|
while len(auxv_data) > 0:
|
|
# Chop off key.
|
|
raw_key = auxv_data[:word_size]
|
|
auxv_data = auxv_data[word_size:]
|
|
|
|
# Chop of value.
|
|
raw_value = auxv_data[:word_size]
|
|
auxv_data = auxv_data[word_size:]
|
|
|
|
# Convert raw text from target endian.
|
|
key = unpack_endian_binary_string(endian, raw_key)
|
|
value = unpack_endian_binary_string(endian, raw_value)
|
|
|
|
if ignore_keys and key in ignore_keys:
|
|
continue
|
|
|
|
# Handle ending entry.
|
|
if key == 0:
|
|
self.assertEqual(value, 0)
|
|
return auxv_dict
|
|
|
|
# The key should not already be present.
|
|
self.assertFalse(key in auxv_dict)
|
|
auxv_dict[key] = value
|
|
|
|
self.fail(
|
|
"should not reach here - implies required double zero entry not found")
|
|
return auxv_dict
|
|
|
|
def read_binary_data_in_chunks(self, command_prefix, chunk_length):
|
|
"""Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
|
|
offset = 0
|
|
done = False
|
|
decoded_data = ""
|
|
|
|
while not done:
|
|
# Grab the next iteration of data.
|
|
self.reset_test_sequence()
|
|
self.test_sequence.add_log_lines(
|
|
[
|
|
"read packet: ${}{:x},{:x}:#00".format(
|
|
command_prefix,
|
|
offset,
|
|
chunk_length),
|
|
{
|
|
"direction": "send",
|
|
"regex": re.compile(
|
|
r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
|
|
re.MULTILINE | re.DOTALL),
|
|
"capture": {
|
|
1: "response_type",
|
|
2: "content_raw"}}],
|
|
True)
|
|
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
|
|
response_type = context.get("response_type")
|
|
self.assertIsNotNone(response_type)
|
|
self.assertTrue(response_type in ["l", "m"])
|
|
|
|
# Move offset along.
|
|
offset += chunk_length
|
|
|
|
# Figure out if we're done. We're done if the response type is l.
|
|
done = response_type == "l"
|
|
|
|
# Decode binary data.
|
|
content_raw = context.get("content_raw")
|
|
if content_raw and len(content_raw) > 0:
|
|
self.assertIsNotNone(content_raw)
|
|
decoded_data += self.decode_gdbremote_binary(content_raw)
|
|
return decoded_data
|
|
|
|
def add_interrupt_packets(self):
|
|
self.test_sequence.add_log_lines([
|
|
# Send the intterupt.
|
|
"read packet: {}".format(chr(3)),
|
|
# And wait for the stop notification.
|
|
{"direction": "send",
|
|
"regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
|
|
"capture": {1: "stop_signo",
|
|
2: "stop_key_val_text"}},
|
|
], True)
|
|
|
|
def parse_interrupt_packets(self, context):
|
|
self.assertIsNotNone(context.get("stop_signo"))
|
|
self.assertIsNotNone(context.get("stop_key_val_text"))
|
|
return (int(context["stop_signo"], 16), self.parse_key_val_dict(
|
|
context["stop_key_val_text"]))
|
|
|
|
def add_QSaveRegisterState_packets(self, thread_id):
|
|
if thread_id:
|
|
# Use the thread suffix form.
|
|
request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(
|
|
thread_id)
|
|
else:
|
|
request = "read packet: $QSaveRegisterState#00"
|
|
|
|
self.test_sequence.add_log_lines([request,
|
|
{"direction": "send",
|
|
"regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$",
|
|
"capture": {1: "save_response"}},
|
|
],
|
|
True)
|
|
|
|
def parse_QSaveRegisterState_response(self, context):
|
|
self.assertIsNotNone(context)
|
|
|
|
save_response = context.get("save_response")
|
|
self.assertIsNotNone(save_response)
|
|
|
|
if len(save_response) < 1 or save_response[0] == "E":
|
|
# error received
|
|
return (False, None)
|
|
else:
|
|
return (True, int(save_response))
|
|
|
|
def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
|
|
if thread_id:
|
|
# Use the thread suffix form.
|
|
request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(
|
|
save_id, thread_id)
|
|
else:
|
|
request = "read packet: $QRestoreRegisterState:{}#00".format(
|
|
save_id)
|
|
|
|
self.test_sequence.add_log_lines([
|
|
request,
|
|
"send packet: $OK#00"
|
|
], True)
|
|
|
|
def flip_all_bits_in_each_register_value(
|
|
self, reg_infos, endian, thread_id=None):
|
|
self.assertIsNotNone(reg_infos)
|
|
|
|
successful_writes = 0
|
|
failed_writes = 0
|
|
|
|
for reg_info in reg_infos:
|
|
# Use the lldb register index added to the reg info. We're not necessarily
|
|
# working off a full set of register infos, so an inferred register
|
|
# index could be wrong.
|
|
reg_index = reg_info["lldb_register_index"]
|
|
self.assertIsNotNone(reg_index)
|
|
|
|
reg_byte_size = int(reg_info["bitsize"]) / 8
|
|
self.assertTrue(reg_byte_size > 0)
|
|
|
|
# Handle thread suffix.
|
|
if thread_id:
|
|
p_request = "read packet: $p{:x};thread:{:x}#00".format(
|
|
reg_index, thread_id)
|
|
else:
|
|
p_request = "read packet: $p{:x}#00".format(reg_index)
|
|
|
|
# Read the existing value.
|
|
self.reset_test_sequence()
|
|
self.test_sequence.add_log_lines([
|
|
p_request,
|
|
{"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
|
|
], True)
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
|
|
# Verify the response length.
|
|
p_response = context.get("p_response")
|
|
self.assertIsNotNone(p_response)
|
|
initial_reg_value = unpack_register_hex_unsigned(
|
|
endian, p_response)
|
|
|
|
# Flip the value by xoring with all 1s
|
|
all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) / 8)
|
|
flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
|
|
# print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
|
|
|
|
# Handle thread suffix for P.
|
|
if thread_id:
|
|
P_request = "read packet: $P{:x}={};thread:{:x}#00".format(
|
|
reg_index, pack_register_hex(
|
|
endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
|
|
else:
|
|
P_request = "read packet: $P{:x}={}#00".format(
|
|
reg_index, pack_register_hex(
|
|
endian, flipped_bits_int, byte_size=reg_byte_size))
|
|
|
|
# Write the flipped value to the register.
|
|
self.reset_test_sequence()
|
|
self.test_sequence.add_log_lines([P_request,
|
|
{"direction": "send",
|
|
"regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}",
|
|
"capture": {1: "P_response"}},
|
|
],
|
|
True)
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
|
|
# Determine if the write succeeded. There are a handful of registers that can fail, or partially fail
|
|
# (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them
|
|
# all flipping perfectly.
|
|
P_response = context.get("P_response")
|
|
self.assertIsNotNone(P_response)
|
|
if P_response == "OK":
|
|
successful_writes += 1
|
|
else:
|
|
failed_writes += 1
|
|
# print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
|
|
|
|
# Read back the register value, ensure it matches the flipped
|
|
# value.
|
|
if P_response == "OK":
|
|
self.reset_test_sequence()
|
|
self.test_sequence.add_log_lines([
|
|
p_request,
|
|
{"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
|
|
], True)
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
|
|
verify_p_response_raw = context.get("p_response")
|
|
self.assertIsNotNone(verify_p_response_raw)
|
|
verify_bits = unpack_register_hex_unsigned(
|
|
endian, verify_p_response_raw)
|
|
|
|
if verify_bits != flipped_bits_int:
|
|
# Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts.
|
|
# print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
|
|
successful_writes -= 1
|
|
failed_writes += 1
|
|
|
|
return (successful_writes, failed_writes)
|
|
|
|
def is_bit_flippable_register(self, reg_info):
|
|
if not reg_info:
|
|
return False
|
|
if not "set" in reg_info:
|
|
return False
|
|
if reg_info["set"] != "General Purpose Registers":
|
|
return False
|
|
if ("container-regs" in reg_info) and (
|
|
len(reg_info["container-regs"]) > 0):
|
|
# Don't try to bit flip registers contained in another register.
|
|
return False
|
|
if re.match("^.s$", reg_info["name"]):
|
|
# This is a 2-letter register name that ends in "s", like a segment register.
|
|
# Don't try to bit flip these.
|
|
return False
|
|
if re.match("^(c|)psr$", reg_info["name"]):
|
|
# This is an ARM program status register; don't flip it.
|
|
return False
|
|
# Okay, this looks fine-enough.
|
|
return True
|
|
|
|
def read_register_values(self, reg_infos, endian, thread_id=None):
|
|
self.assertIsNotNone(reg_infos)
|
|
values = {}
|
|
|
|
for reg_info in reg_infos:
|
|
# We append a register index when load reg infos so we can work
|
|
# with subsets.
|
|
reg_index = reg_info.get("lldb_register_index")
|
|
self.assertIsNotNone(reg_index)
|
|
|
|
# Handle thread suffix.
|
|
if thread_id:
|
|
p_request = "read packet: $p{:x};thread:{:x}#00".format(
|
|
reg_index, thread_id)
|
|
else:
|
|
p_request = "read packet: $p{:x}#00".format(reg_index)
|
|
|
|
# Read it with p.
|
|
self.reset_test_sequence()
|
|
self.test_sequence.add_log_lines([
|
|
p_request,
|
|
{"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
|
|
], True)
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
|
|
# Convert value from target endian to integral.
|
|
p_response = context.get("p_response")
|
|
self.assertIsNotNone(p_response)
|
|
self.assertTrue(len(p_response) > 0)
|
|
self.assertFalse(p_response[0] == "E")
|
|
|
|
values[reg_index] = unpack_register_hex_unsigned(
|
|
endian, p_response)
|
|
|
|
return values
|
|
|
|
def add_vCont_query_packets(self):
|
|
self.test_sequence.add_log_lines(["read packet: $vCont?#49",
|
|
{"direction": "send",
|
|
"regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$",
|
|
"capture": {2: "vCont_query_response"}},
|
|
],
|
|
True)
|
|
|
|
def parse_vCont_query_response(self, context):
|
|
self.assertIsNotNone(context)
|
|
vCont_query_response = context.get("vCont_query_response")
|
|
|
|
# Handle case of no vCont support at all - in which case the capture
|
|
# group will be none or zero length.
|
|
if not vCont_query_response or len(vCont_query_response) == 0:
|
|
return {}
|
|
|
|
return {key: 1 for key in vCont_query_response.split(
|
|
";") if key and len(key) > 0}
|
|
|
|
def count_single_steps_until_true(
|
|
self,
|
|
thread_id,
|
|
predicate,
|
|
args,
|
|
max_step_count=100,
|
|
use_Hc_packet=True,
|
|
step_instruction="s"):
|
|
"""Used by single step test that appears in a few different contexts."""
|
|
single_step_count = 0
|
|
|
|
while single_step_count < max_step_count:
|
|
self.assertIsNotNone(thread_id)
|
|
|
|
# Build the packet for the single step instruction. We replace
|
|
# {thread}, if present, with the thread_id.
|
|
step_packet = "read packet: ${}#00".format(
|
|
re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
|
|
# print("\nstep_packet created: {}\n".format(step_packet))
|
|
|
|
# Single step.
|
|
self.reset_test_sequence()
|
|
if use_Hc_packet:
|
|
self.test_sequence.add_log_lines(
|
|
[ # Set the continue thread.
|
|
"read packet: $Hc{0:x}#00".format(thread_id),
|
|
"send packet: $OK#00",
|
|
], True)
|
|
self.test_sequence.add_log_lines([
|
|
# Single step.
|
|
step_packet,
|
|
# "read packet: $vCont;s:{0:x}#00".format(thread_id),
|
|
# Expect a breakpoint stop report.
|
|
{"direction": "send",
|
|
"regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
|
|
"capture": {1: "stop_signo",
|
|
2: "stop_thread_id"}},
|
|
], True)
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
self.assertIsNotNone(context.get("stop_signo"))
|
|
self.assertEqual(int(context.get("stop_signo"), 16),
|
|
lldbutil.get_signal_number('SIGTRAP'))
|
|
|
|
single_step_count += 1
|
|
|
|
# See if the predicate is true. If so, we're done.
|
|
if predicate(args):
|
|
return (True, single_step_count)
|
|
|
|
# The predicate didn't return true within the runaway step count.
|
|
return (False, single_step_count)
|
|
|
|
def g_c1_c2_contents_are(self, args):
|
|
"""Used by single step test that appears in a few different contexts."""
|
|
g_c1_address = args["g_c1_address"]
|
|
g_c2_address = args["g_c2_address"]
|
|
expected_g_c1 = args["expected_g_c1"]
|
|
expected_g_c2 = args["expected_g_c2"]
|
|
|
|
# Read g_c1 and g_c2 contents.
|
|
self.reset_test_sequence()
|
|
self.test_sequence.add_log_lines(
|
|
["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
|
|
{"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}},
|
|
"read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
|
|
{"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}],
|
|
True)
|
|
|
|
# Run the packet stream.
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
|
|
# Check if what we read from inferior memory is what we are expecting.
|
|
self.assertIsNotNone(context.get("g_c1_contents"))
|
|
self.assertIsNotNone(context.get("g_c2_contents"))
|
|
|
|
return (context.get("g_c1_contents").decode("hex") == expected_g_c1) and (
|
|
context.get("g_c2_contents").decode("hex") == expected_g_c2)
|
|
|
|
def single_step_only_steps_one_instruction(
|
|
self, use_Hc_packet=True, step_instruction="s"):
|
|
"""Used by single step test that appears in a few different contexts."""
|
|
# Start up the inferior.
|
|
procs = self.prep_debug_monitor_and_inferior(
|
|
inferior_args=[
|
|
"get-code-address-hex:swap_chars",
|
|
"get-data-address-hex:g_c1",
|
|
"get-data-address-hex:g_c2",
|
|
"sleep:1",
|
|
"call-function:swap_chars",
|
|
"sleep:5"])
|
|
|
|
# Run the process
|
|
self.test_sequence.add_log_lines(
|
|
[ # Start running after initial stop.
|
|
"read packet: $c#63",
|
|
# Match output line that prints the memory address of the function call entry point.
|
|
# Note we require launch-only testing so we can get inferior otuput.
|
|
{"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
|
|
"capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}},
|
|
# Now stop the inferior.
|
|
"read packet: {}".format(chr(3)),
|
|
# And wait for the stop notification.
|
|
{"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
|
|
True)
|
|
|
|
# Run the packet stream.
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
|
|
# Grab the main thread id.
|
|
self.assertIsNotNone(context.get("stop_thread_id"))
|
|
main_thread_id = int(context.get("stop_thread_id"), 16)
|
|
|
|
# Grab the function address.
|
|
self.assertIsNotNone(context.get("function_address"))
|
|
function_address = int(context.get("function_address"), 16)
|
|
|
|
# Grab the data addresses.
|
|
self.assertIsNotNone(context.get("g_c1_address"))
|
|
g_c1_address = int(context.get("g_c1_address"), 16)
|
|
|
|
self.assertIsNotNone(context.get("g_c2_address"))
|
|
g_c2_address = int(context.get("g_c2_address"), 16)
|
|
|
|
# Set a breakpoint at the given address.
|
|
if self.getArchitecture() == "arm":
|
|
# TODO: Handle case when setting breakpoint in thumb code
|
|
BREAKPOINT_KIND = 4
|
|
else:
|
|
BREAKPOINT_KIND = 1
|
|
self.reset_test_sequence()
|
|
self.add_set_breakpoint_packets(
|
|
function_address,
|
|
do_continue=True,
|
|
breakpoint_kind=BREAKPOINT_KIND)
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
|
|
# Remove the breakpoint.
|
|
self.reset_test_sequence()
|
|
self.add_remove_breakpoint_packets(
|
|
function_address, breakpoint_kind=BREAKPOINT_KIND)
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
|
|
# Verify g_c1 and g_c2 match expected initial state.
|
|
args = {}
|
|
args["g_c1_address"] = g_c1_address
|
|
args["g_c2_address"] = g_c2_address
|
|
args["expected_g_c1"] = "0"
|
|
args["expected_g_c2"] = "1"
|
|
|
|
self.assertTrue(self.g_c1_c2_contents_are(args))
|
|
|
|
# Verify we take only a small number of steps to hit the first state.
|
|
# Might need to work through function entry prologue code.
|
|
args["expected_g_c1"] = "1"
|
|
args["expected_g_c2"] = "1"
|
|
(state_reached,
|
|
step_count) = self.count_single_steps_until_true(main_thread_id,
|
|
self.g_c1_c2_contents_are,
|
|
args,
|
|
max_step_count=25,
|
|
use_Hc_packet=use_Hc_packet,
|
|
step_instruction=step_instruction)
|
|
self.assertTrue(state_reached)
|
|
|
|
# Verify we hit the next state.
|
|
args["expected_g_c1"] = "1"
|
|
args["expected_g_c2"] = "0"
|
|
(state_reached,
|
|
step_count) = self.count_single_steps_until_true(main_thread_id,
|
|
self.g_c1_c2_contents_are,
|
|
args,
|
|
max_step_count=5,
|
|
use_Hc_packet=use_Hc_packet,
|
|
step_instruction=step_instruction)
|
|
self.assertTrue(state_reached)
|
|
expected_step_count = 1
|
|
arch = self.getArchitecture()
|
|
|
|
# MIPS required "3" (ADDIU, SB, LD) machine instructions for updation
|
|
# of variable value
|
|
if re.match("mips", arch):
|
|
expected_step_count = 3
|
|
# S390X requires "2" (LARL, MVI) machine instructions for updation of
|
|
# variable value
|
|
if re.match("s390x", arch):
|
|
expected_step_count = 2
|
|
self.assertEqual(step_count, expected_step_count)
|
|
|
|
# Verify we hit the next state.
|
|
args["expected_g_c1"] = "0"
|
|
args["expected_g_c2"] = "0"
|
|
(state_reached,
|
|
step_count) = self.count_single_steps_until_true(main_thread_id,
|
|
self.g_c1_c2_contents_are,
|
|
args,
|
|
max_step_count=5,
|
|
use_Hc_packet=use_Hc_packet,
|
|
step_instruction=step_instruction)
|
|
self.assertTrue(state_reached)
|
|
self.assertEqual(step_count, expected_step_count)
|
|
|
|
# Verify we hit the next state.
|
|
args["expected_g_c1"] = "0"
|
|
args["expected_g_c2"] = "1"
|
|
(state_reached,
|
|
step_count) = self.count_single_steps_until_true(main_thread_id,
|
|
self.g_c1_c2_contents_are,
|
|
args,
|
|
max_step_count=5,
|
|
use_Hc_packet=use_Hc_packet,
|
|
step_instruction=step_instruction)
|
|
self.assertTrue(state_reached)
|
|
self.assertEqual(step_count, expected_step_count)
|
|
|
|
def maybe_strict_output_regex(self, regex):
|
|
return '.*' + regex + \
|
|
'.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$'
|
|
|
|
def install_and_create_launch_args(self):
|
|
exe_path = self.getBuildArtifact("a.out")
|
|
if not lldb.remote_platform:
|
|
return [exe_path]
|
|
remote_path = lldbutil.append_to_process_working_directory(self,
|
|
os.path.basename(exe_path))
|
|
remote_file_spec = lldb.SBFileSpec(remote_path, False)
|
|
err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True),
|
|
remote_file_spec)
|
|
if err.Fail():
|
|
raise Exception("remote_platform.Install('%s', '%s') failed: %s" %
|
|
(exe_path, remote_path, err))
|
|
return [remote_path]
|