aboutsummaryrefslogtreecommitdiff
path: root/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py
diff options
context:
space:
mode:
Diffstat (limited to 'packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py')
-rw-r--r--packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py187
1 files changed, 187 insertions, 0 deletions
diff --git a/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py b/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py
new file mode 100644
index 000000000000..795a8c6652d0
--- /dev/null
+++ b/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py
@@ -0,0 +1,187 @@
+
+from __future__ import print_function
+
+
+
+import re
+import select
+import threading
+import traceback
+import codecs
+
+from six.moves import queue
+
+def _handle_output_packet_string(packet_contents):
+ if (not packet_contents) or (len(packet_contents) < 1):
+ return None
+ elif packet_contents[0] != "O":
+ return None
+ elif packet_contents == "OK":
+ return None
+ else:
+ return packet_contents[1:].decode("hex")
+
+def _dump_queue(the_queue):
+ while not the_queue.empty():
+ print(codecs.encode(the_queue.get(True), "string_escape"))
+ print("\n")
+
+class SocketPacketPump(object):
+ """A threaded packet reader that partitions packets into two streams.
+
+ All incoming $O packet content is accumulated with the current accumulation
+ state put into the OutputQueue.
+
+ All other incoming packets are placed in the packet queue.
+
+ A select thread can be started and stopped, and runs to place packet
+ content into the two queues.
+ """
+
+ _GDB_REMOTE_PACKET_REGEX = re.compile(r'^\$([^\#]*)#[0-9a-fA-F]{2}')
+
+ def __init__(self, pump_socket, logger=None):
+ if not pump_socket:
+ raise Exception("pump_socket cannot be None")
+
+ self._output_queue = queue.Queue()
+ self._packet_queue = queue.Queue()
+ self._thread = None
+ self._stop_thread = False
+ self._socket = pump_socket
+ self._logger = logger
+ self._receive_buffer = ""
+ self._accumulated_output = ""
+
+ def __enter__(self):
+ """Support the python 'with' statement.
+
+ Start the pump thread."""
+ self.start_pump_thread()
+ return self
+
+ def __exit__(self, exit_type, value, the_traceback):
+ """Support the python 'with' statement.
+
+ Shut down the pump thread."""
+ self.stop_pump_thread()
+
+ # Warn if there is any content left in any of the queues.
+ # That would represent unmatched packets.
+ if not self.output_queue().empty():
+ print("warning: output queue entries still exist:")
+ _dump_queue(self.output_queue())
+ print("from here:")
+ traceback.print_stack()
+
+ if not self.packet_queue().empty():
+ print("warning: packet queue entries still exist:")
+ _dump_queue(self.packet_queue())
+ print("from here:")
+ traceback.print_stack()
+
+ def start_pump_thread(self):
+ if self._thread:
+ raise Exception("pump thread is already running")
+ self._stop_thread = False
+ self._thread = threading.Thread(target=self._run_method)
+ self._thread.start()
+
+ def stop_pump_thread(self):
+ self._stop_thread = True
+ if self._thread:
+ self._thread.join()
+
+ def output_queue(self):
+ return self._output_queue
+
+ def packet_queue(self):
+ return self._packet_queue
+
+ def _process_new_bytes(self, new_bytes):
+ if not new_bytes:
+ return
+ if len(new_bytes) < 1:
+ return
+
+ # Add new bytes to our accumulated unprocessed packet bytes.
+ self._receive_buffer += new_bytes
+
+ # Parse fully-formed packets into individual packets.
+ has_more = len(self._receive_buffer) > 0
+ while has_more:
+ if len(self._receive_buffer) <= 0:
+ has_more = False
+ # handle '+' ack
+ elif self._receive_buffer[0] == "+":
+ self._packet_queue.put("+")
+ self._receive_buffer = self._receive_buffer[1:]
+ if self._logger:
+ self._logger.debug(
+ "parsed packet from stub: +\n" +
+ "new receive_buffer: {}".format(
+ self._receive_buffer))
+ else:
+ packet_match = self._GDB_REMOTE_PACKET_REGEX.match(
+ self._receive_buffer)
+ if packet_match:
+ # Our receive buffer matches a packet at the
+ # start of the receive buffer.
+ new_output_content = _handle_output_packet_string(
+ packet_match.group(1))
+ if new_output_content:
+ # This was an $O packet with new content.
+ self._accumulated_output += new_output_content
+ self._output_queue.put(self._accumulated_output)
+ else:
+ # Any packet other than $O.
+ self._packet_queue.put(packet_match.group(0))
+
+ # Remove the parsed packet from the receive
+ # buffer.
+ self._receive_buffer = self._receive_buffer[
+ len(packet_match.group(0)):]
+ if self._logger:
+ self._logger.debug(
+ "parsed packet from stub: " +
+ packet_match.group(0))
+ self._logger.debug(
+ "new receive_buffer: " +
+ self._receive_buffer)
+ else:
+ # We don't have enough in the receive bufferto make a full
+ # packet. Stop trying until we read more.
+ has_more = False
+
+ def _run_method(self):
+ self._receive_buffer = ""
+ self._accumulated_output = ""
+
+ if self._logger:
+ self._logger.info("socket pump starting")
+
+ # Keep looping around until we're asked to stop the thread.
+ while not self._stop_thread:
+ can_read, _, _ = select.select([self._socket], [], [], 0)
+ if can_read and self._socket in can_read:
+ try:
+ new_bytes = self._socket.recv(4096)
+ if self._logger and new_bytes and len(new_bytes) > 0:
+ self._logger.debug(
+ "pump received bytes: {}".format(new_bytes))
+ except:
+ # Likely a closed socket. Done with the pump thread.
+ if self._logger:
+ self._logger.debug(
+ "socket read failed, stopping pump read thread")
+ break
+ self._process_new_bytes(new_bytes)
+
+ if self._logger:
+ self._logger.info("socket pump exiting")
+
+ def get_accumulated_output(self):
+ return self._accumulated_output
+
+ def get_receive_buffer(self):
+ return self._receive_buffer