llvm-project/lldb/test/tools/lldb-server/socket_packet_pump.py

182 lines
6.4 KiB
Python

import Queue
import re
import select
import threading
import traceback
import codecs
def _handle_output_packet_string(packet_contents):
if (not packet_contents) or (len(packet_contents) < 1):
return None
elif packet_contents[0] != "O":
return None
elif packet_contents == "OK":
return None
else:
return packet_contents[1:].decode("hex")
def _dump_queue(the_queue):
while not the_queue.empty():
print codecs.encode(the_queue.get(True), "string_escape")
print "\n"
class SocketPacketPump(object):
"""A threaded packet reader that partitions packets into two streams.
All incoming $O packet content is accumulated with the current accumulation
state put into the OutputQueue.
All other incoming packets are placed in the packet queue.
A select thread can be started and stopped, and runs to place packet
content into the two queues.
"""
_GDB_REMOTE_PACKET_REGEX = re.compile(r'^\$([^\#]*)#[0-9a-fA-F]{2}')
def __init__(self, pump_socket, logger=None):
if not pump_socket:
raise Exception("pump_socket cannot be None")
self._output_queue = Queue.Queue()
self._packet_queue = Queue.Queue()
self._thread = None
self._stop_thread = False
self._socket = pump_socket
self._logger = logger
self._receive_buffer = ""
self._accumulated_output = ""
def __enter__(self):
"""Support the python 'with' statement.
Start the pump thread."""
self.start_pump_thread()
return self
def __exit__(self, exit_type, value, the_traceback):
"""Support the python 'with' statement.
Shut down the pump thread."""
self.stop_pump_thread()
# Warn if there is any content left in any of the queues.
# That would represent unmatched packets.
if not self.output_queue().empty():
print "warning: output queue entries still exist:"
_dump_queue(self.output_queue())
print "from here:"
traceback.print_stack()
if not self.packet_queue().empty():
print "warning: packet queue entries still exist:"
_dump_queue(self.packet_queue())
print "from here:"
traceback.print_stack()
def start_pump_thread(self):
if self._thread:
raise Exception("pump thread is already running")
self._stop_thread = False
self._thread = threading.Thread(target=self._run_method)
self._thread.start()
def stop_pump_thread(self):
self._stop_thread = True
if self._thread:
self._thread.join()
def output_queue(self):
return self._output_queue
def packet_queue(self):
return self._packet_queue
def _process_new_bytes(self, new_bytes):
if not new_bytes:
return
if len(new_bytes) < 1:
return
# Add new bytes to our accumulated unprocessed packet bytes.
self._receive_buffer += new_bytes
# Parse fully-formed packets into individual packets.
has_more = len(self._receive_buffer) > 0
while has_more:
if len(self._receive_buffer) <= 0:
has_more = False
# handle '+' ack
elif self._receive_buffer[0] == "+":
self._packet_queue.put("+")
self._receive_buffer = self._receive_buffer[1:]
if self._logger:
self._logger.debug(
"parsed packet from stub: +\n" +
"new receive_buffer: {}".format(
self._receive_buffer))
else:
packet_match = self._GDB_REMOTE_PACKET_REGEX.match(
self._receive_buffer)
if packet_match:
# Our receive buffer matches a packet at the
# start of the receive buffer.
new_output_content = _handle_output_packet_string(
packet_match.group(1))
if new_output_content:
# This was an $O packet with new content.
self._accumulated_output += new_output_content
self._output_queue.put(self._accumulated_output)
else:
# Any packet other than $O.
self._packet_queue.put(packet_match.group(0))
# Remove the parsed packet from the receive
# buffer.
self._receive_buffer = self._receive_buffer[
len(packet_match.group(0)):]
if self._logger:
self._logger.debug(
"parsed packet from stub: " +
packet_match.group(0))
self._logger.debug(
"new receive_buffer: " +
self._receive_buffer)
else:
# We don't have enough in the receive bufferto make a full
# packet. Stop trying until we read more.
has_more = False
def _run_method(self):
self._receive_buffer = ""
self._accumulated_output = ""
if self._logger:
self._logger.info("socket pump starting")
# Keep looping around until we're asked to stop the thread.
while not self._stop_thread:
can_read, _, _ = select.select([self._socket], [], [], 0)
if can_read and self._socket in can_read:
try:
new_bytes = self._socket.recv(4096)
if self._logger and new_bytes and len(new_bytes) > 0:
self._logger.debug(
"pump received bytes: {}".format(new_bytes))
except:
# Likely a closed socket. Done with the pump thread.
if self._logger:
self._logger.debug(
"socket read failed, stopping pump read thread")
break
self._process_new_bytes(new_bytes)
if self._logger:
self._logger.info("socket pump exiting")
def get_accumulated_output(self):
return self._accumulated_output
def get_receive_buffer(self):
return self._receive_buffer