from __future__ import print_function import re import select import threading import traceback import codecs from six.moves import queue def _handle_output_packet_string(packet_contents): if (not packet_contents) or (len(packet_contents) < 1): return None elif packet_contents[0] != "O": return None elif packet_contents == "OK": return None else: return packet_contents[1:].decode("hex") def _dump_queue(the_queue): while not the_queue.empty(): print(codecs.encode(the_queue.get(True), "string_escape")) print("\n") class SocketPacketPump(object): """A threaded packet reader that partitions packets into two streams. All incoming $O packet content is accumulated with the current accumulation state put into the OutputQueue. All other incoming packets are placed in the packet queue. A select thread can be started and stopped, and runs to place packet content into the two queues. """ _GDB_REMOTE_PACKET_REGEX = re.compile(r'^\$([^\#]*)#[0-9a-fA-F]{2}') def __init__(self, pump_socket, logger=None): if not pump_socket: raise Exception("pump_socket cannot be None") self._output_queue = queue.Queue() self._packet_queue = queue.Queue() self._thread = None self._stop_thread = False self._socket = pump_socket self._logger = logger self._receive_buffer = "" self._accumulated_output = "" def __enter__(self): """Support the python 'with' statement. Start the pump thread.""" self.start_pump_thread() return self def __exit__(self, exit_type, value, the_traceback): """Support the python 'with' statement. Shut down the pump thread.""" self.stop_pump_thread() # Warn if there is any content left in any of the queues. # That would represent unmatched packets. if not self.output_queue().empty(): print("warning: output queue entries still exist:") _dump_queue(self.output_queue()) print("from here:") traceback.print_stack() if not self.packet_queue().empty(): print("warning: packet queue entries still exist:") _dump_queue(self.packet_queue()) print("from here:") traceback.print_stack() def start_pump_thread(self): if self._thread: raise Exception("pump thread is already running") self._stop_thread = False self._thread = threading.Thread(target=self._run_method) self._thread.start() def stop_pump_thread(self): self._stop_thread = True if self._thread: self._thread.join() def output_queue(self): return self._output_queue def packet_queue(self): return self._packet_queue def _process_new_bytes(self, new_bytes): if not new_bytes: return if len(new_bytes) < 1: return # Add new bytes to our accumulated unprocessed packet bytes. self._receive_buffer += new_bytes # Parse fully-formed packets into individual packets. has_more = len(self._receive_buffer) > 0 while has_more: if len(self._receive_buffer) <= 0: has_more = False # handle '+' ack elif self._receive_buffer[0] == "+": self._packet_queue.put("+") self._receive_buffer = self._receive_buffer[1:] if self._logger: self._logger.debug( "parsed packet from stub: +\n" + "new receive_buffer: {}".format( self._receive_buffer)) else: packet_match = self._GDB_REMOTE_PACKET_REGEX.match( self._receive_buffer) if packet_match: # Our receive buffer matches a packet at the # start of the receive buffer. new_output_content = _handle_output_packet_string( packet_match.group(1)) if new_output_content: # This was an $O packet with new content. self._accumulated_output += new_output_content self._output_queue.put(self._accumulated_output) else: # Any packet other than $O. self._packet_queue.put(packet_match.group(0)) # Remove the parsed packet from the receive # buffer. self._receive_buffer = self._receive_buffer[ len(packet_match.group(0)):] if self._logger: self._logger.debug( "parsed packet from stub: " + packet_match.group(0)) self._logger.debug( "new receive_buffer: " + self._receive_buffer) else: # We don't have enough in the receive bufferto make a full # packet. Stop trying until we read more. has_more = False def _run_method(self): self._receive_buffer = "" self._accumulated_output = "" if self._logger: self._logger.info("socket pump starting") # Keep looping around until we're asked to stop the thread. while not self._stop_thread: can_read, _, _ = select.select([self._socket], [], [], 0) if can_read and self._socket in can_read: try: new_bytes = self._socket.recv(4096) if self._logger and new_bytes and len(new_bytes) > 0: self._logger.debug( "pump received bytes: {}".format(new_bytes)) except: # Likely a closed socket. Done with the pump thread. if self._logger: self._logger.debug( "socket read failed, stopping pump read thread") break self._process_new_bytes(new_bytes) if self._logger: self._logger.info("socket pump exiting") def get_accumulated_output(self): return self._accumulated_output def get_receive_buffer(self): return self._receive_buffer