forked from OSchip/llvm-project
844 lines
34 KiB
Python
844 lines
34 KiB
Python
"""Module for supporting unit testing of the lldb-server debug monitor exe.
|
|
"""
|
|
|
|
import os
|
|
import os.path
|
|
import platform
|
|
import Queue
|
|
import re
|
|
import socket_packet_pump
|
|
import subprocess
|
|
import time
|
|
from lldbtest import *
|
|
|
|
def _get_debug_monitor_from_lldb(lldb_exe, debug_monitor_basename):
|
|
"""Return the debug monitor exe path given the lldb exe path.
|
|
|
|
This method attempts to construct a valid debug monitor exe name
|
|
from a given lldb exe name. It will return None if the synthesized
|
|
debug monitor name is not found to exist.
|
|
|
|
The debug monitor exe path is synthesized by taking the directory
|
|
of the lldb exe, and replacing the portion of the base name that
|
|
matches "lldb" (case insensitive) and replacing with the value of
|
|
debug_monitor_basename.
|
|
|
|
Args:
|
|
lldb_exe: the path to an lldb executable.
|
|
|
|
debug_monitor_basename: the base name portion of the debug monitor
|
|
that will replace 'lldb'.
|
|
|
|
Returns:
|
|
A path to the debug monitor exe if it is found to exist; otherwise,
|
|
returns None.
|
|
|
|
"""
|
|
|
|
exe_dir = os.path.dirname(lldb_exe)
|
|
exe_base = os.path.basename(lldb_exe)
|
|
|
|
# we'll rebuild the filename by replacing lldb with
|
|
# the debug monitor basename, keeping any prefix or suffix in place.
|
|
regex = re.compile(r"lldb", re.IGNORECASE)
|
|
new_base = regex.sub(debug_monitor_basename, exe_base)
|
|
|
|
debug_monitor_exe = os.path.join(exe_dir, new_base)
|
|
if os.path.exists(debug_monitor_exe):
|
|
return debug_monitor_exe
|
|
else:
|
|
return None
|
|
|
|
|
|
def get_lldb_server_exe():
|
|
"""Return the lldb-server exe path.
|
|
|
|
Returns:
|
|
A path to the lldb-gdbserver exe if it is found to exist; otherwise,
|
|
returns None.
|
|
"""
|
|
if "LLDB_DEBUGSERVER_PATH" in os.environ:
|
|
return os.environ["LLDB_DEBUGSERVER_PATH"]
|
|
elif "LLDB_EXEC" in os.environ:
|
|
lldb_exe = os.environ["LLDB_EXEC"]
|
|
if not lldb_exe:
|
|
return None
|
|
else:
|
|
return _get_debug_monitor_from_lldb(lldb_exe, "lldb-server")
|
|
else:
|
|
return None
|
|
|
|
def get_debugserver_exe():
|
|
"""Return the debugserver exe path.
|
|
|
|
Returns:
|
|
A path to the debugserver exe if it is found to exist; otherwise,
|
|
returns None.
|
|
"""
|
|
if "LLDB_DEBUGSERVER_PATH" in os.environ:
|
|
return os.environ["LLDB_DEBUGSERVER_PATH"]
|
|
elif "LLDB_EXEC" in os.environ:
|
|
lldb_exe = os.environ["LLDB_EXEC"]
|
|
if not lldb_exe:
|
|
return None
|
|
else:
|
|
return _get_debug_monitor_from_lldb(lldb_exe, "debugserver")
|
|
else:
|
|
return None
|
|
|
|
|
|
_LOG_LINE_REGEX = re.compile(r'^(lldb-server|debugserver)\s+<\s*(\d+)>' +
|
|
'\s+(read|send)\s+packet:\s+(.+)$')
|
|
|
|
|
|
def _is_packet_lldb_gdbserver_input(packet_type, llgs_input_is_read):
|
|
"""Return whether a given packet is input for lldb-gdbserver.
|
|
|
|
Args:
|
|
packet_type: a string indicating 'send' or 'receive', from a
|
|
gdbremote packet protocol log.
|
|
|
|
llgs_input_is_read: true if lldb-gdbserver input (content sent to
|
|
lldb-gdbserver) is listed as 'read' or 'send' in the packet
|
|
log entry.
|
|
|
|
Returns:
|
|
True if the packet should be considered input for lldb-gdbserver; False
|
|
otherwise.
|
|
"""
|
|
if packet_type == 'read':
|
|
# when llgs is the read side, then a read packet is meant for
|
|
# input to llgs (when captured from the llgs/debugserver exe).
|
|
return llgs_input_is_read
|
|
elif packet_type == 'send':
|
|
# when llgs is the send side, then a send packet is meant to
|
|
# be input to llgs (when captured from the lldb exe).
|
|
return not llgs_input_is_read
|
|
else:
|
|
# don't understand what type of packet this is
|
|
raise "Unknown packet type: {}".format(packet_type)
|
|
|
|
|
|
def handle_O_packet(context, packet_contents, logger):
|
|
"""Handle O packets."""
|
|
if (not packet_contents) or (len(packet_contents) < 1):
|
|
return False
|
|
elif packet_contents[0] != "O":
|
|
return False
|
|
elif packet_contents == "OK":
|
|
return False
|
|
|
|
new_text = gdbremote_hex_decode_string(packet_contents[1:])
|
|
context["O_content"] += new_text
|
|
context["O_count"] += 1
|
|
|
|
if logger:
|
|
logger.debug("text: new \"{}\", cumulative: \"{}\"".format(new_text, context["O_content"]))
|
|
|
|
return True
|
|
|
|
_STRIP_CHECKSUM_REGEX = re.compile(r'#[0-9a-fA-F]{2}$')
|
|
_STRIP_COMMAND_PREFIX_REGEX = re.compile(r"^\$")
|
|
_STRIP_COMMAND_PREFIX_M_REGEX = re.compile(r"^\$m")
|
|
|
|
|
|
def assert_packets_equal(asserter, actual_packet, expected_packet):
|
|
# strip off the checksum digits of the packet. When we're in
|
|
# no-ack mode, the # checksum is ignored, and should not be cause
|
|
# for a mismatched packet.
|
|
actual_stripped = _STRIP_CHECKSUM_REGEX.sub('', actual_packet)
|
|
expected_stripped = _STRIP_CHECKSUM_REGEX.sub('', expected_packet)
|
|
asserter.assertEqual(actual_stripped, expected_stripped)
|
|
|
|
def expect_lldb_gdbserver_replay(
|
|
asserter,
|
|
sock,
|
|
test_sequence,
|
|
timeout_seconds,
|
|
logger=None):
|
|
"""Replay socket communication with lldb-gdbserver and verify responses.
|
|
|
|
Args:
|
|
asserter: the object providing assertEqual(first, second, msg=None), e.g. TestCase instance.
|
|
|
|
sock: the TCP socket connected to the lldb-gdbserver exe.
|
|
|
|
test_sequence: a GdbRemoteTestSequence instance that describes
|
|
the messages sent to the gdb remote and the responses
|
|
expected from it.
|
|
|
|
timeout_seconds: any response taking more than this number of
|
|
seconds will cause an exception to be raised.
|
|
|
|
logger: a Python logger instance.
|
|
|
|
Returns:
|
|
The context dictionary from running the given gdbremote
|
|
protocol sequence. This will contain any of the capture
|
|
elements specified to any GdbRemoteEntry instances in
|
|
test_sequence.
|
|
|
|
The context will also contain an entry, context["O_content"]
|
|
which contains the text from the inferior received via $O
|
|
packets. $O packets should not attempt to be matched
|
|
directly since they are not entirely deterministic as to
|
|
how many arrive and how much text is in each one.
|
|
|
|
context["O_count"] will contain an integer of the number of
|
|
O packets received.
|
|
"""
|
|
|
|
# Ensure we have some work to do.
|
|
if len(test_sequence.entries) < 1:
|
|
return {}
|
|
|
|
context = {"O_count":0, "O_content":""}
|
|
with socket_packet_pump.SocketPacketPump(sock, logger) as pump:
|
|
# Grab the first sequence entry.
|
|
sequence_entry = test_sequence.entries.pop(0)
|
|
|
|
# While we have an active sequence entry, send messages
|
|
# destined for the stub and collect/match/process responses
|
|
# expected from the stub.
|
|
while sequence_entry:
|
|
if sequence_entry.is_send_to_remote():
|
|
# This is an entry to send to the remote debug monitor.
|
|
send_packet = sequence_entry.get_send_packet()
|
|
if logger:
|
|
if len(send_packet) == 1 and send_packet[0] == chr(3):
|
|
packet_desc = "^C"
|
|
else:
|
|
packet_desc = send_packet
|
|
logger.info("sending packet to remote: {}".format(packet_desc))
|
|
sock.sendall(send_packet)
|
|
else:
|
|
# This is an entry expecting to receive content from the remote debug monitor.
|
|
|
|
# We'll pull from (and wait on) the queue appropriate for the type of matcher.
|
|
# We keep separate queues for process output (coming from non-deterministic
|
|
# $O packet division) and for all other packets.
|
|
if sequence_entry.is_output_matcher():
|
|
try:
|
|
# Grab next entry from the output queue.
|
|
content = pump.output_queue().get(True, timeout_seconds)
|
|
except Queue.Empty:
|
|
if logger:
|
|
logger.warning("timeout waiting for stub output (accumulated output:{})".format(pump.get_accumulated_output()))
|
|
raise Exception("timed out while waiting for output match (accumulated output: {})".format(pump.get_accumulated_output()))
|
|
else:
|
|
try:
|
|
content = pump.packet_queue().get(True, timeout_seconds)
|
|
except Queue.Empty:
|
|
if logger:
|
|
logger.warning("timeout waiting for packet match (receive buffer: {})".format(pump.get_receive_buffer()))
|
|
raise Exception("timed out while waiting for packet match (receive buffer: {})".format(pump.get_receive_buffer()))
|
|
|
|
# Give the sequence entry the opportunity to match the content.
|
|
# Output matchers might match or pass after more output accumulates.
|
|
# Other packet types generally must match.
|
|
asserter.assertIsNotNone(content)
|
|
context = sequence_entry.assert_match(asserter, content, context=context)
|
|
|
|
# Move on to next sequence entry as needed. Some sequence entries support executing multiple
|
|
# times in different states (for looping over query/response packets).
|
|
if sequence_entry.is_consumed():
|
|
if len(test_sequence.entries) > 0:
|
|
sequence_entry = test_sequence.entries.pop(0)
|
|
else:
|
|
sequence_entry = None
|
|
|
|
# Fill in the O_content entries.
|
|
context["O_count"] = 1
|
|
context["O_content"] = pump.get_accumulated_output()
|
|
|
|
return context
|
|
|
|
def gdbremote_hex_encode_string(str):
|
|
output = ''
|
|
for c in str:
|
|
output += '{0:02x}'.format(ord(c))
|
|
return output
|
|
|
|
def gdbremote_hex_decode_string(str):
|
|
return str.decode("hex")
|
|
|
|
def gdbremote_packet_encode_string(str):
|
|
checksum = 0
|
|
for c in str:
|
|
checksum += ord(c)
|
|
return '$' + str + '#{0:02x}'.format(checksum % 256)
|
|
|
|
def build_gdbremote_A_packet(args_list):
|
|
"""Given a list of args, create a properly-formed $A packet containing each arg.
|
|
"""
|
|
payload = "A"
|
|
|
|
# build the arg content
|
|
arg_index = 0
|
|
for arg in args_list:
|
|
# Comma-separate the args.
|
|
if arg_index > 0:
|
|
payload += ','
|
|
|
|
# Hex-encode the arg.
|
|
hex_arg = gdbremote_hex_encode_string(arg)
|
|
|
|
# Build the A entry.
|
|
payload += "{},{},{}".format(len(hex_arg), arg_index, hex_arg)
|
|
|
|
# Next arg index, please.
|
|
arg_index += 1
|
|
|
|
# return the packetized payload
|
|
return gdbremote_packet_encode_string(payload)
|
|
|
|
|
|
def parse_reg_info_response(response_packet):
|
|
if not response_packet:
|
|
raise Exception("response_packet cannot be None")
|
|
|
|
# Strip off prefix $ and suffix #xx if present.
|
|
response_packet = _STRIP_COMMAND_PREFIX_REGEX.sub("", response_packet)
|
|
response_packet = _STRIP_CHECKSUM_REGEX.sub("", response_packet)
|
|
|
|
# Build keyval pairs
|
|
values = {}
|
|
for kv in response_packet.split(";"):
|
|
if len(kv) < 1:
|
|
continue
|
|
(key, val) = kv.split(':')
|
|
values[key] = val
|
|
|
|
return values
|
|
|
|
|
|
def parse_threadinfo_response(response_packet):
|
|
if not response_packet:
|
|
raise Exception("response_packet cannot be None")
|
|
|
|
# Strip off prefix $ and suffix #xx if present.
|
|
response_packet = _STRIP_COMMAND_PREFIX_M_REGEX.sub("", response_packet)
|
|
response_packet = _STRIP_CHECKSUM_REGEX.sub("", response_packet)
|
|
|
|
# Return list of thread ids
|
|
return [int(thread_id_hex,16) for thread_id_hex in response_packet.split(",") if len(thread_id_hex) > 0]
|
|
|
|
def unpack_endian_binary_string(endian, value_string):
|
|
"""Unpack a gdb-remote binary (post-unescaped, i.e. not escaped) response to an unsigned int given endianness of the inferior."""
|
|
if not endian:
|
|
raise Exception("endian cannot be None")
|
|
if not value_string or len(value_string) < 1:
|
|
raise Exception("value_string cannot be None or empty")
|
|
|
|
if endian == 'little':
|
|
value = 0
|
|
i = 0
|
|
while len(value_string) > 0:
|
|
value += (ord(value_string[0]) << i)
|
|
value_string = value_string[1:]
|
|
i += 8
|
|
return value
|
|
elif endian == 'big':
|
|
value = 0
|
|
while len(value_string) > 0:
|
|
value = (value << 8) + ord(value_string[0])
|
|
value_string = value_string[1:]
|
|
return value
|
|
else:
|
|
# pdp is valid but need to add parse code once needed.
|
|
raise Exception("unsupported endian:{}".format(endian))
|
|
|
|
def unpack_register_hex_unsigned(endian, value_string):
|
|
"""Unpack a gdb-remote $p-style response to an unsigned int given endianness of inferior."""
|
|
if not endian:
|
|
raise Exception("endian cannot be None")
|
|
if not value_string or len(value_string) < 1:
|
|
raise Exception("value_string cannot be None or empty")
|
|
|
|
if endian == 'little':
|
|
value = 0
|
|
i = 0
|
|
while len(value_string) > 0:
|
|
value += (int(value_string[0:2], 16) << i)
|
|
value_string = value_string[2:]
|
|
i += 8
|
|
return value
|
|
elif endian == 'big':
|
|
return int(value_string, 16)
|
|
else:
|
|
# pdp is valid but need to add parse code once needed.
|
|
raise Exception("unsupported endian:{}".format(endian))
|
|
|
|
def pack_register_hex(endian, value, byte_size=None):
|
|
"""Unpack a gdb-remote $p-style response to an unsigned int given endianness of inferior."""
|
|
if not endian:
|
|
raise Exception("endian cannot be None")
|
|
|
|
if endian == 'little':
|
|
# Create the litt-endian return value.
|
|
retval = ""
|
|
while value != 0:
|
|
retval = retval + "{:02x}".format(value & 0xff)
|
|
value = value >> 8
|
|
if byte_size:
|
|
# Add zero-fill to the right/end (MSB side) of the value.
|
|
retval += "00" * (byte_size - len(retval)/2)
|
|
return retval
|
|
|
|
elif endian == 'big':
|
|
retval = value.encode("hex")
|
|
if byte_size:
|
|
# Add zero-fill to the left/front (MSB side) of the value.
|
|
retval = ("00" * (byte_size - len(retval)/2)) + retval
|
|
return retval
|
|
|
|
else:
|
|
# pdp is valid but need to add parse code once needed.
|
|
raise Exception("unsupported endian:{}".format(endian))
|
|
|
|
class GdbRemoteEntryBase(object):
|
|
def is_output_matcher(self):
|
|
return False
|
|
|
|
class GdbRemoteEntry(GdbRemoteEntryBase):
|
|
|
|
def __init__(self, is_send_to_remote=True, exact_payload=None, regex=None, capture=None, expect_captures=None):
|
|
"""Create an entry representing one piece of the I/O to/from a gdb remote debug monitor.
|
|
|
|
Args:
|
|
|
|
is_send_to_remote: True if this entry is a message to be
|
|
sent to the gdbremote debug monitor; False if this
|
|
entry represents text to be matched against the reply
|
|
from the gdbremote debug monitor.
|
|
|
|
exact_payload: if not None, then this packet is an exact
|
|
send (when sending to the remote) or an exact match of
|
|
the response from the gdbremote. The checksums are
|
|
ignored on exact match requests since negotiation of
|
|
no-ack makes the checksum content essentially
|
|
undefined.
|
|
|
|
regex: currently only valid for receives from gdbremote.
|
|
When specified (and only if exact_payload is None),
|
|
indicates the gdbremote response must match the given
|
|
regex. Match groups in the regex can be used for two
|
|
different purposes: saving the match (see capture
|
|
arg), or validating that a match group matches a
|
|
previously established value (see expect_captures). It
|
|
is perfectly valid to have just a regex arg and to
|
|
specify neither capture or expect_captures args. This
|
|
arg only makes sense if exact_payload is not
|
|
specified.
|
|
|
|
capture: if specified, is a dictionary of regex match
|
|
group indices (should start with 1) to variable names
|
|
that will store the capture group indicated by the
|
|
index. For example, {1:"thread_id"} will store capture
|
|
group 1's content in the context dictionary where
|
|
"thread_id" is the key and the match group value is
|
|
the value. The value stored off can be used later in a
|
|
expect_captures expression. This arg only makes sense
|
|
when regex is specified.
|
|
|
|
expect_captures: if specified, is a dictionary of regex
|
|
match group indices (should start with 1) to variable
|
|
names, where the match group should match the value
|
|
existing in the context at the given variable name.
|
|
For example, {2:"thread_id"} indicates that the second
|
|
match group must match the value stored under the
|
|
context's previously stored "thread_id" key. This arg
|
|
only makes sense when regex is specified.
|
|
"""
|
|
self._is_send_to_remote = is_send_to_remote
|
|
self.exact_payload = exact_payload
|
|
self.regex = regex
|
|
self.capture = capture
|
|
self.expect_captures = expect_captures
|
|
|
|
def is_send_to_remote(self):
|
|
return self._is_send_to_remote
|
|
|
|
def is_consumed(self):
|
|
# For now, all packets are consumed after first use.
|
|
return True
|
|
|
|
def get_send_packet(self):
|
|
if not self.is_send_to_remote():
|
|
raise Exception("get_send_packet() called on GdbRemoteEntry that is not a send-to-remote packet")
|
|
if not self.exact_payload:
|
|
raise Exception("get_send_packet() called on GdbRemoteEntry but it doesn't have an exact payload")
|
|
return self.exact_payload
|
|
|
|
def _assert_exact_payload_match(self, asserter, actual_packet):
|
|
assert_packets_equal(asserter, actual_packet, self.exact_payload)
|
|
return None
|
|
|
|
def _assert_regex_match(self, asserter, actual_packet, context):
|
|
# Ensure the actual packet matches from the start of the actual packet.
|
|
match = self.regex.match(actual_packet)
|
|
if not match:
|
|
asserter.fail("regex '{}' failed to match against content '{}'".format(self.regex.pattern, actual_packet))
|
|
|
|
if self.capture:
|
|
# Handle captures.
|
|
for group_index, var_name in self.capture.items():
|
|
capture_text = match.group(group_index)
|
|
# It is okay for capture text to be None - which it will be if it is a group that can match nothing.
|
|
# The user must be okay with it since the regex itself matched above.
|
|
context[var_name] = capture_text
|
|
|
|
if self.expect_captures:
|
|
# Handle comparing matched groups to context dictionary entries.
|
|
for group_index, var_name in self.expect_captures.items():
|
|
capture_text = match.group(group_index)
|
|
if not capture_text:
|
|
raise Exception("No content to expect for group index {}".format(group_index))
|
|
asserter.assertEquals(capture_text, context[var_name])
|
|
|
|
return context
|
|
|
|
def assert_match(self, asserter, actual_packet, context=None):
|
|
# This only makes sense for matching lines coming from the
|
|
# remote debug monitor.
|
|
if self.is_send_to_remote():
|
|
raise Exception("Attempted to match a packet being sent to the remote debug monitor, doesn't make sense.")
|
|
|
|
# Create a new context if needed.
|
|
if not context:
|
|
context = {}
|
|
|
|
# If this is an exact payload, ensure they match exactly,
|
|
# ignoring the packet checksum which is optional for no-ack
|
|
# mode.
|
|
if self.exact_payload:
|
|
self._assert_exact_payload_match(asserter, actual_packet)
|
|
return context
|
|
elif self.regex:
|
|
return self._assert_regex_match(asserter, actual_packet, context)
|
|
else:
|
|
raise Exception("Don't know how to match a remote-sent packet when exact_payload isn't specified.")
|
|
|
|
class MultiResponseGdbRemoteEntry(GdbRemoteEntryBase):
|
|
"""Represents a query/response style packet.
|
|
|
|
Assumes the first item is sent to the gdb remote.
|
|
An end sequence regex indicates the end of the query/response
|
|
packet sequence. All responses up through (but not including) the
|
|
end response are stored in a context variable.
|
|
|
|
Settings accepted from params:
|
|
|
|
next_query or query: required. The typical query packet without the $ prefix or #xx suffix.
|
|
If there is a special first packet to start the iteration query, see the
|
|
first_query key.
|
|
|
|
first_query: optional. If the first query requires a special query command, specify
|
|
it with this key. Do not specify the $ prefix or #xx suffix.
|
|
|
|
append_iteration_suffix: defaults to False. Specify True if the 0-based iteration
|
|
index should be appended as a suffix to the command. e.g. qRegisterInfo with
|
|
this key set true will generate query packets of qRegisterInfo0, qRegisterInfo1,
|
|
etc.
|
|
|
|
end_regex: required. Specifies a compiled regex object that will match the full text
|
|
of any response that signals an end to the iteration. It must include the
|
|
initial $ and ending #xx and must match the whole packet.
|
|
|
|
save_key: required. Specifies the key within the context where an array will be stored.
|
|
Each packet received from the gdb remote that does not match the end_regex will get
|
|
appended to the array stored within the context at that key.
|
|
|
|
runaway_response_count: optional. Defaults to 10000. If this many responses are retrieved,
|
|
assume there is something wrong with either the response collection or the ending
|
|
detection regex and throw an exception.
|
|
"""
|
|
def __init__(self, params):
|
|
self._next_query = params.get("next_query", params.get("query"))
|
|
if not self._next_query:
|
|
raise "either next_query or query key must be specified for MultiResponseGdbRemoteEntry"
|
|
|
|
self._first_query = params.get("first_query", self._next_query)
|
|
self._append_iteration_suffix = params.get("append_iteration_suffix", False)
|
|
self._iteration = 0
|
|
self._end_regex = params["end_regex"]
|
|
self._save_key = params["save_key"]
|
|
self._runaway_response_count = params.get("runaway_response_count", 10000)
|
|
self._is_send_to_remote = True
|
|
self._end_matched = False
|
|
|
|
def is_send_to_remote(self):
|
|
return self._is_send_to_remote
|
|
|
|
def get_send_packet(self):
|
|
if not self.is_send_to_remote():
|
|
raise Exception("get_send_packet() called on MultiResponseGdbRemoteEntry that is not in the send state")
|
|
if self._end_matched:
|
|
raise Exception("get_send_packet() called on MultiResponseGdbRemoteEntry but end of query/response sequence has already been seen.")
|
|
|
|
# Choose the first or next query for the base payload.
|
|
if self._iteration == 0 and self._first_query:
|
|
payload = self._first_query
|
|
else:
|
|
payload = self._next_query
|
|
|
|
# Append the suffix as needed.
|
|
if self._append_iteration_suffix:
|
|
payload += "%x" % self._iteration
|
|
|
|
# Keep track of the iteration.
|
|
self._iteration += 1
|
|
|
|
# Now that we've given the query packet, flip the mode to receive/match.
|
|
self._is_send_to_remote = False
|
|
|
|
# Return the result, converted to packet form.
|
|
return gdbremote_packet_encode_string(payload)
|
|
|
|
def is_consumed(self):
|
|
return self._end_matched
|
|
|
|
def assert_match(self, asserter, actual_packet, context=None):
|
|
# This only makes sense for matching lines coming from the remote debug monitor.
|
|
if self.is_send_to_remote():
|
|
raise Exception("assert_match() called on MultiResponseGdbRemoteEntry but state is set to send a query packet.")
|
|
|
|
if self._end_matched:
|
|
raise Exception("assert_match() called on MultiResponseGdbRemoteEntry but end of query/response sequence has already been seen.")
|
|
|
|
# Set up a context as needed.
|
|
if not context:
|
|
context = {}
|
|
|
|
# Check if the packet matches the end condition.
|
|
match = self._end_regex.match(actual_packet)
|
|
if match:
|
|
# We're done iterating.
|
|
self._end_matched = True
|
|
return context
|
|
|
|
# Not done iterating - save the packet.
|
|
context[self._save_key] = context.get(self._save_key, [])
|
|
context[self._save_key].append(actual_packet)
|
|
|
|
# Check for a runaway response cycle.
|
|
if len(context[self._save_key]) >= self._runaway_response_count:
|
|
raise Exception("runaway query/response cycle detected: %d responses captured so far. Last response: %s" %
|
|
(len(context[self._save_key]), context[self._save_key][-1]))
|
|
|
|
# Flip the mode to send for generating the query.
|
|
self._is_send_to_remote = True
|
|
return context
|
|
|
|
class MatchRemoteOutputEntry(GdbRemoteEntryBase):
|
|
"""Waits for output from the debug monitor to match a regex or time out.
|
|
|
|
This entry type tries to match each time new gdb remote output is accumulated
|
|
using a provided regex. If the output does not match the regex within the
|
|
given timeframe, the command fails the playback session. If the regex does
|
|
match, any capture fields are recorded in the context.
|
|
|
|
Settings accepted from params:
|
|
|
|
regex: required. Specifies a compiled regex object that must either succeed
|
|
with re.match or re.search (see regex_mode below) within the given timeout
|
|
(see timeout_seconds below) or cause the playback to fail.
|
|
|
|
regex_mode: optional. Available values: "match" or "search". If "match", the entire
|
|
stub output as collected so far must match the regex. If search, then the regex
|
|
must match starting somewhere within the output text accumulated thus far.
|
|
Default: "match" (i.e. the regex must match the entirety of the accumulated output
|
|
buffer, so unexpected text will generally fail the match).
|
|
|
|
capture: optional. If specified, is a dictionary of regex match group indices (should start
|
|
with 1) to variable names that will store the capture group indicated by the
|
|
index. For example, {1:"thread_id"} will store capture group 1's content in the
|
|
context dictionary where "thread_id" is the key and the match group value is
|
|
the value. The value stored off can be used later in a expect_captures expression.
|
|
This arg only makes sense when regex is specified.
|
|
"""
|
|
def __init__(self, regex=None, regex_mode="match", capture=None):
|
|
self._regex = regex
|
|
self._regex_mode = regex_mode
|
|
self._capture = capture
|
|
self._matched = False
|
|
|
|
if not self._regex:
|
|
raise Exception("regex cannot be None")
|
|
|
|
if not self._regex_mode in ["match", "search"]:
|
|
raise Exception("unsupported regex mode \"{}\": must be \"match\" or \"search\"".format(self._regex_mode))
|
|
|
|
def is_output_matcher(self):
|
|
return True
|
|
|
|
def is_send_to_remote(self):
|
|
# This is always a "wait for remote" command.
|
|
return False
|
|
|
|
def is_consumed(self):
|
|
return self._matched
|
|
|
|
def assert_match(self, asserter, accumulated_output, context):
|
|
# Validate args.
|
|
if not accumulated_output:
|
|
raise Exception("accumulated_output cannot be none")
|
|
if not context:
|
|
raise Exception("context cannot be none")
|
|
|
|
# Validate that we haven't already matched.
|
|
if self._matched:
|
|
raise Exception("invalid state - already matched, attempting to match again")
|
|
|
|
# If we don't have any content yet, we don't match.
|
|
if len(accumulated_output) < 1:
|
|
return context
|
|
|
|
# Check if we match
|
|
if self._regex_mode == "match":
|
|
match = self._regex.match(accumulated_output)
|
|
elif self._regex_mode == "search":
|
|
match = self._regex.search(accumulated_output)
|
|
else:
|
|
raise Exception("Unexpected regex mode: {}".format(self._regex_mode))
|
|
|
|
# If we don't match, wait to try again after next $O content, or time out.
|
|
if not match:
|
|
# print "re pattern \"{}\" did not match against \"{}\"".format(self._regex.pattern, accumulated_output)
|
|
return context
|
|
|
|
# We do match.
|
|
self._matched = True
|
|
# print "re pattern \"{}\" matched against \"{}\"".format(self._regex.pattern, accumulated_output)
|
|
|
|
# Collect up any captures into the context.
|
|
if self._capture:
|
|
# Handle captures.
|
|
for group_index, var_name in self._capture.items():
|
|
capture_text = match.group(group_index)
|
|
if not capture_text:
|
|
raise Exception("No content for group index {}".format(group_index))
|
|
context[var_name] = capture_text
|
|
|
|
return context
|
|
|
|
|
|
class GdbRemoteTestSequence(object):
|
|
|
|
_LOG_LINE_REGEX = re.compile(r'^.*(read|send)\s+packet:\s+(.+)$')
|
|
|
|
def __init__(self, logger):
|
|
self.entries = []
|
|
self.logger = logger
|
|
|
|
def add_log_lines(self, log_lines, remote_input_is_read):
|
|
for line in log_lines:
|
|
if type(line) == str:
|
|
# Handle log line import
|
|
# if self.logger:
|
|
# self.logger.debug("processing log line: {}".format(line))
|
|
match = self._LOG_LINE_REGEX.match(line)
|
|
if match:
|
|
playback_packet = match.group(2)
|
|
direction = match.group(1)
|
|
if _is_packet_lldb_gdbserver_input(direction, remote_input_is_read):
|
|
# Handle as something to send to the remote debug monitor.
|
|
# if self.logger:
|
|
# self.logger.info("processed packet to send to remote: {}".format(playback_packet))
|
|
self.entries.append(GdbRemoteEntry(is_send_to_remote=True, exact_payload=playback_packet))
|
|
else:
|
|
# Log line represents content to be expected from the remote debug monitor.
|
|
# if self.logger:
|
|
# self.logger.info("receiving packet from llgs, should match: {}".format(playback_packet))
|
|
self.entries.append(GdbRemoteEntry(is_send_to_remote=False,exact_payload=playback_packet))
|
|
else:
|
|
raise Exception("failed to interpret log line: {}".format(line))
|
|
elif type(line) == dict:
|
|
entry_type = line.get("type", "regex_capture")
|
|
if entry_type == "regex_capture":
|
|
# Handle more explicit control over details via dictionary.
|
|
direction = line.get("direction", None)
|
|
regex = line.get("regex", None)
|
|
capture = line.get("capture", None)
|
|
expect_captures = line.get("expect_captures", None)
|
|
|
|
# Compile the regex.
|
|
if regex and (type(regex) == str):
|
|
regex = re.compile(regex)
|
|
|
|
if _is_packet_lldb_gdbserver_input(direction, remote_input_is_read):
|
|
# Handle as something to send to the remote debug monitor.
|
|
# if self.logger:
|
|
# self.logger.info("processed dict sequence to send to remote")
|
|
self.entries.append(GdbRemoteEntry(is_send_to_remote=True, regex=regex, capture=capture, expect_captures=expect_captures))
|
|
else:
|
|
# Log line represents content to be expected from the remote debug monitor.
|
|
# if self.logger:
|
|
# self.logger.info("processed dict sequence to match receiving from remote")
|
|
self.entries.append(GdbRemoteEntry(is_send_to_remote=False, regex=regex, capture=capture, expect_captures=expect_captures))
|
|
elif entry_type == "multi_response":
|
|
self.entries.append(MultiResponseGdbRemoteEntry(line))
|
|
elif entry_type == "output_match":
|
|
|
|
regex = line.get("regex", None)
|
|
# Compile the regex.
|
|
if regex and (type(regex) == str):
|
|
regex = re.compile(regex)
|
|
|
|
regex_mode = line.get("regex_mode", "match")
|
|
capture = line.get("capture", None)
|
|
self.entries.append(MatchRemoteOutputEntry(regex=regex, regex_mode=regex_mode, capture=capture))
|
|
else:
|
|
raise Exception("unknown entry type \"%s\"" % entry_type)
|
|
|
|
def process_is_running(pid, unknown_value=True):
|
|
"""If possible, validate that the given pid represents a running process on the local system.
|
|
|
|
Args:
|
|
|
|
pid: an OS-specific representation of a process id. Should be an integral value.
|
|
|
|
unknown_value: value used when we cannot determine how to check running local
|
|
processes on the OS.
|
|
|
|
Returns:
|
|
|
|
If we can figure out how to check running process ids on the given OS:
|
|
return True if the process is running, or False otherwise.
|
|
|
|
If we don't know how to check running process ids on the given OS:
|
|
return the value provided by the unknown_value arg.
|
|
"""
|
|
if type(pid) not in [int, long]:
|
|
raise Exception("pid must be of type int (actual type: %s)" % str(type(pid)))
|
|
|
|
process_ids = []
|
|
|
|
if lldb.remote_platform:
|
|
# Don't know how to get list of running process IDs on a remote
|
|
# platform
|
|
return unknown_value
|
|
elif platform.system() in ['Darwin', 'Linux', 'FreeBSD', 'NetBSD']:
|
|
# Build the list of running process ids
|
|
output = subprocess.check_output("ps ax | awk '{ print $1; }'", shell=True)
|
|
text_process_ids = output.split('\n')[1:]
|
|
# Convert text pids to ints
|
|
process_ids = [int(text_pid) for text_pid in text_process_ids if text_pid != '']
|
|
# elif {your_platform_here}:
|
|
# fill in process_ids as a list of int type process IDs running on
|
|
# the local system.
|
|
else:
|
|
# Don't know how to get list of running process IDs on this
|
|
# OS, so return the "don't know" value.
|
|
return unknown_value
|
|
|
|
# Check if the pid is in the process_ids
|
|
return pid in process_ids
|
|
|
|
if __name__ == '__main__':
|
|
EXE_PATH = get_lldb_server_exe()
|
|
if EXE_PATH:
|
|
print "lldb-server path detected: {}".format(EXE_PATH)
|
|
else:
|
|
print "lldb-server could not be found"
|