aboutsummaryrefslogtreecommitdiff
path: root/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py
diff options
context:
space:
mode:
Diffstat (limited to 'packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py')
-rw-r--r--packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py843
1 files changed, 843 insertions, 0 deletions
diff --git a/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py b/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py
new file mode 100644
index 000000000000..c0ea841e2a03
--- /dev/null
+++ b/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py
@@ -0,0 +1,843 @@
+"""Module for supporting unit testing of the lldb-server debug monitor exe.
+"""
+
+from __future__ import print_function
+
+
+
+import os
+import os.path
+import platform
+import re
+import six
+import socket_packet_pump
+import subprocess
+import time
+from lldbsuite.test.lldbtest import *
+
+from six.moves import queue
+
+def _get_debug_monitor_from_lldb(lldb_exe, debug_monitor_basename):
+ """Return the debug monitor exe path given the lldb exe path.
+
+ This method attempts to construct a valid debug monitor exe name
+ from a given lldb exe name. It will return None if the synthesized
+ debug monitor name is not found to exist.
+
+ The debug monitor exe path is synthesized by taking the directory
+ of the lldb exe, and replacing the portion of the base name that
+ matches "lldb" (case insensitive) and replacing with the value of
+ debug_monitor_basename.
+
+ Args:
+ lldb_exe: the path to an lldb executable.
+
+ debug_monitor_basename: the base name portion of the debug monitor
+ that will replace 'lldb'.
+
+ Returns:
+ A path to the debug monitor exe if it is found to exist; otherwise,
+ returns None.
+
+ """
+ if not lldb_exe:
+ return None
+
+ exe_dir = os.path.dirname(lldb_exe)
+ exe_base = os.path.basename(lldb_exe)
+
+ # we'll rebuild the filename by replacing lldb with
+ # the debug monitor basename, keeping any prefix or suffix in place.
+ regex = re.compile(r"lldb", re.IGNORECASE)
+ new_base = regex.sub(debug_monitor_basename, exe_base)
+
+ debug_monitor_exe = os.path.join(exe_dir, new_base)
+ if os.path.exists(debug_monitor_exe):
+ return debug_monitor_exe
+
+ new_base = regex.sub( 'LLDB.framework/Versions/A/Resources/' + debug_monitor_basename, exe_base)
+ debug_monitor_exe = os.path.join(exe_dir, new_base)
+ if os.path.exists(debug_monitor_exe):
+ return debug_monitor_exe
+
+ return None
+
+
+def get_lldb_server_exe():
+ """Return the lldb-server exe path.
+
+ Returns:
+ A path to the lldb-server exe if it is found to exist; otherwise,
+ returns None.
+ """
+ if "LLDB_DEBUGSERVER_PATH" in os.environ:
+ return os.environ["LLDB_DEBUGSERVER_PATH"]
+
+ return _get_debug_monitor_from_lldb(lldbtest_config.lldbExec, "lldb-server")
+
+def get_debugserver_exe():
+ """Return the debugserver exe path.
+
+ Returns:
+ A path to the debugserver exe if it is found to exist; otherwise,
+ returns None.
+ """
+ if "LLDB_DEBUGSERVER_PATH" in os.environ:
+ return os.environ["LLDB_DEBUGSERVER_PATH"]
+
+ return _get_debug_monitor_from_lldb(lldbtest_config.lldbExec, "debugserver")
+
+_LOG_LINE_REGEX = re.compile(r'^(lldb-server|debugserver)\s+<\s*(\d+)>' +
+ '\s+(read|send)\s+packet:\s+(.+)$')
+
+
+def _is_packet_lldb_gdbserver_input(packet_type, llgs_input_is_read):
+ """Return whether a given packet is input for lldb-gdbserver.
+
+ Args:
+ packet_type: a string indicating 'send' or 'receive', from a
+ gdbremote packet protocol log.
+
+ llgs_input_is_read: true if lldb-gdbserver input (content sent to
+ lldb-gdbserver) is listed as 'read' or 'send' in the packet
+ log entry.
+
+ Returns:
+ True if the packet should be considered input for lldb-gdbserver; False
+ otherwise.
+ """
+ if packet_type == 'read':
+ # when llgs is the read side, then a read packet is meant for
+ # input to llgs (when captured from the llgs/debugserver exe).
+ return llgs_input_is_read
+ elif packet_type == 'send':
+ # when llgs is the send side, then a send packet is meant to
+ # be input to llgs (when captured from the lldb exe).
+ return not llgs_input_is_read
+ else:
+ # don't understand what type of packet this is
+ raise "Unknown packet type: {}".format(packet_type)
+
+
+def handle_O_packet(context, packet_contents, logger):
+ """Handle O packets."""
+ if (not packet_contents) or (len(packet_contents) < 1):
+ return False
+ elif packet_contents[0] != "O":
+ return False
+ elif packet_contents == "OK":
+ return False
+
+ new_text = gdbremote_hex_decode_string(packet_contents[1:])
+ context["O_content"] += new_text
+ context["O_count"] += 1
+
+ if logger:
+ logger.debug("text: new \"{}\", cumulative: \"{}\"".format(new_text, context["O_content"]))
+
+ return True
+
+_STRIP_CHECKSUM_REGEX = re.compile(r'#[0-9a-fA-F]{2}$')
+_STRIP_COMMAND_PREFIX_REGEX = re.compile(r"^\$")
+_STRIP_COMMAND_PREFIX_M_REGEX = re.compile(r"^\$m")
+
+
+def assert_packets_equal(asserter, actual_packet, expected_packet):
+ # strip off the checksum digits of the packet. When we're in
+ # no-ack mode, the # checksum is ignored, and should not be cause
+ # for a mismatched packet.
+ actual_stripped = _STRIP_CHECKSUM_REGEX.sub('', actual_packet)
+ expected_stripped = _STRIP_CHECKSUM_REGEX.sub('', expected_packet)
+ asserter.assertEqual(actual_stripped, expected_stripped)
+
+def expect_lldb_gdbserver_replay(
+ asserter,
+ sock,
+ test_sequence,
+ timeout_seconds,
+ logger=None):
+ """Replay socket communication with lldb-gdbserver and verify responses.
+
+ Args:
+ asserter: the object providing assertEqual(first, second, msg=None), e.g. TestCase instance.
+
+ sock: the TCP socket connected to the lldb-gdbserver exe.
+
+ test_sequence: a GdbRemoteTestSequence instance that describes
+ the messages sent to the gdb remote and the responses
+ expected from it.
+
+ timeout_seconds: any response taking more than this number of
+ seconds will cause an exception to be raised.
+
+ logger: a Python logger instance.
+
+ Returns:
+ The context dictionary from running the given gdbremote
+ protocol sequence. This will contain any of the capture
+ elements specified to any GdbRemoteEntry instances in
+ test_sequence.
+
+ The context will also contain an entry, context["O_content"]
+ which contains the text from the inferior received via $O
+ packets. $O packets should not attempt to be matched
+ directly since they are not entirely deterministic as to
+ how many arrive and how much text is in each one.
+
+ context["O_count"] will contain an integer of the number of
+ O packets received.
+ """
+
+ # Ensure we have some work to do.
+ if len(test_sequence.entries) < 1:
+ return {}
+
+ context = {"O_count":0, "O_content":""}
+ with socket_packet_pump.SocketPacketPump(sock, logger) as pump:
+ # Grab the first sequence entry.
+ sequence_entry = test_sequence.entries.pop(0)
+
+ # While we have an active sequence entry, send messages
+ # destined for the stub and collect/match/process responses
+ # expected from the stub.
+ while sequence_entry:
+ if sequence_entry.is_send_to_remote():
+ # This is an entry to send to the remote debug monitor.
+ send_packet = sequence_entry.get_send_packet()
+ if logger:
+ if len(send_packet) == 1 and send_packet[0] == chr(3):
+ packet_desc = "^C"
+ else:
+ packet_desc = send_packet
+ logger.info("sending packet to remote: {}".format(packet_desc))
+ sock.sendall(send_packet)
+ else:
+ # This is an entry expecting to receive content from the remote debug monitor.
+
+ # We'll pull from (and wait on) the queue appropriate for the type of matcher.
+ # We keep separate queues for process output (coming from non-deterministic
+ # $O packet division) and for all other packets.
+ if sequence_entry.is_output_matcher():
+ try:
+ # Grab next entry from the output queue.
+ content = pump.output_queue().get(True, timeout_seconds)
+ except queue.Empty:
+ if logger:
+ logger.warning("timeout waiting for stub output (accumulated output:{})".format(pump.get_accumulated_output()))
+ raise Exception("timed out while waiting for output match (accumulated output: {})".format(pump.get_accumulated_output()))
+ else:
+ try:
+ content = pump.packet_queue().get(True, timeout_seconds)
+ except queue.Empty:
+ if logger:
+ logger.warning("timeout waiting for packet match (receive buffer: {})".format(pump.get_receive_buffer()))
+ raise Exception("timed out while waiting for packet match (receive buffer: {})".format(pump.get_receive_buffer()))
+
+ # Give the sequence entry the opportunity to match the content.
+ # Output matchers might match or pass after more output accumulates.
+ # Other packet types generally must match.
+ asserter.assertIsNotNone(content)
+ context = sequence_entry.assert_match(asserter, content, context=context)
+
+ # Move on to next sequence entry as needed. Some sequence entries support executing multiple
+ # times in different states (for looping over query/response packets).
+ if sequence_entry.is_consumed():
+ if len(test_sequence.entries) > 0:
+ sequence_entry = test_sequence.entries.pop(0)
+ else:
+ sequence_entry = None
+
+ # Fill in the O_content entries.
+ context["O_count"] = 1
+ context["O_content"] = pump.get_accumulated_output()
+
+ return context
+
+def gdbremote_hex_encode_string(str):
+ output = ''
+ for c in str:
+ output += '{0:02x}'.format(ord(c))
+ return output
+
+def gdbremote_hex_decode_string(str):
+ return str.decode("hex")
+
+def gdbremote_packet_encode_string(str):
+ checksum = 0
+ for c in str:
+ checksum += ord(c)
+ return '$' + str + '#{0:02x}'.format(checksum % 256)
+
+def build_gdbremote_A_packet(args_list):
+ """Given a list of args, create a properly-formed $A packet containing each arg.
+ """
+ payload = "A"
+
+ # build the arg content
+ arg_index = 0
+ for arg in args_list:
+ # Comma-separate the args.
+ if arg_index > 0:
+ payload += ','
+
+ # Hex-encode the arg.
+ hex_arg = gdbremote_hex_encode_string(arg)
+
+ # Build the A entry.
+ payload += "{},{},{}".format(len(hex_arg), arg_index, hex_arg)
+
+ # Next arg index, please.
+ arg_index += 1
+
+ # return the packetized payload
+ return gdbremote_packet_encode_string(payload)
+
+
+def parse_reg_info_response(response_packet):
+ if not response_packet:
+ raise Exception("response_packet cannot be None")
+
+ # Strip off prefix $ and suffix #xx if present.
+ response_packet = _STRIP_COMMAND_PREFIX_REGEX.sub("", response_packet)
+ response_packet = _STRIP_CHECKSUM_REGEX.sub("", response_packet)
+
+ # Build keyval pairs
+ values = {}
+ for kv in response_packet.split(";"):
+ if len(kv) < 1:
+ continue
+ (key, val) = kv.split(':')
+ values[key] = val
+
+ return values
+
+
+def parse_threadinfo_response(response_packet):
+ if not response_packet:
+ raise Exception("response_packet cannot be None")
+
+ # Strip off prefix $ and suffix #xx if present.
+ response_packet = _STRIP_COMMAND_PREFIX_M_REGEX.sub("", response_packet)
+ response_packet = _STRIP_CHECKSUM_REGEX.sub("", response_packet)
+
+ # Return list of thread ids
+ return [int(thread_id_hex,16) for thread_id_hex in response_packet.split(",") if len(thread_id_hex) > 0]
+
+def unpack_endian_binary_string(endian, value_string):
+ """Unpack a gdb-remote binary (post-unescaped, i.e. not escaped) response to an unsigned int given endianness of the inferior."""
+ if not endian:
+ raise Exception("endian cannot be None")
+ if not value_string or len(value_string) < 1:
+ raise Exception("value_string cannot be None or empty")
+
+ if endian == 'little':
+ value = 0
+ i = 0
+ while len(value_string) > 0:
+ value += (ord(value_string[0]) << i)
+ value_string = value_string[1:]
+ i += 8
+ return value
+ elif endian == 'big':
+ value = 0
+ while len(value_string) > 0:
+ value = (value << 8) + ord(value_string[0])
+ value_string = value_string[1:]
+ return value
+ else:
+ # pdp is valid but need to add parse code once needed.
+ raise Exception("unsupported endian:{}".format(endian))
+
+def unpack_register_hex_unsigned(endian, value_string):
+ """Unpack a gdb-remote $p-style response to an unsigned int given endianness of inferior."""
+ if not endian:
+ raise Exception("endian cannot be None")
+ if not value_string or len(value_string) < 1:
+ raise Exception("value_string cannot be None or empty")
+
+ if endian == 'little':
+ value = 0
+ i = 0
+ while len(value_string) > 0:
+ value += (int(value_string[0:2], 16) << i)
+ value_string = value_string[2:]
+ i += 8
+ return value
+ elif endian == 'big':
+ return int(value_string, 16)
+ else:
+ # pdp is valid but need to add parse code once needed.
+ raise Exception("unsupported endian:{}".format(endian))
+
+def pack_register_hex(endian, value, byte_size=None):
+ """Unpack a gdb-remote $p-style response to an unsigned int given endianness of inferior."""
+ if not endian:
+ raise Exception("endian cannot be None")
+
+ if endian == 'little':
+ # Create the litt-endian return value.
+ retval = ""
+ while value != 0:
+ retval = retval + "{:02x}".format(value & 0xff)
+ value = value >> 8
+ if byte_size:
+ # Add zero-fill to the right/end (MSB side) of the value.
+ retval += "00" * (byte_size - len(retval)/2)
+ return retval
+
+ elif endian == 'big':
+ retval = value.encode("hex")
+ if byte_size:
+ # Add zero-fill to the left/front (MSB side) of the value.
+ retval = ("00" * (byte_size - len(retval)/2)) + retval
+ return retval
+
+ else:
+ # pdp is valid but need to add parse code once needed.
+ raise Exception("unsupported endian:{}".format(endian))
+
+class GdbRemoteEntryBase(object):
+ def is_output_matcher(self):
+ return False
+
+class GdbRemoteEntry(GdbRemoteEntryBase):
+
+ def __init__(self, is_send_to_remote=True, exact_payload=None, regex=None, capture=None, expect_captures=None):
+ """Create an entry representing one piece of the I/O to/from a gdb remote debug monitor.
+
+ Args:
+
+ is_send_to_remote: True if this entry is a message to be
+ sent to the gdbremote debug monitor; False if this
+ entry represents text to be matched against the reply
+ from the gdbremote debug monitor.
+
+ exact_payload: if not None, then this packet is an exact
+ send (when sending to the remote) or an exact match of
+ the response from the gdbremote. The checksums are
+ ignored on exact match requests since negotiation of
+ no-ack makes the checksum content essentially
+ undefined.
+
+ regex: currently only valid for receives from gdbremote.
+ When specified (and only if exact_payload is None),
+ indicates the gdbremote response must match the given
+ regex. Match groups in the regex can be used for two
+ different purposes: saving the match (see capture
+ arg), or validating that a match group matches a
+ previously established value (see expect_captures). It
+ is perfectly valid to have just a regex arg and to
+ specify neither capture or expect_captures args. This
+ arg only makes sense if exact_payload is not
+ specified.
+
+ capture: if specified, is a dictionary of regex match
+ group indices (should start with 1) to variable names
+ that will store the capture group indicated by the
+ index. For example, {1:"thread_id"} will store capture
+ group 1's content in the context dictionary where
+ "thread_id" is the key and the match group value is
+ the value. The value stored off can be used later in a
+ expect_captures expression. This arg only makes sense
+ when regex is specified.
+
+ expect_captures: if specified, is a dictionary of regex
+ match group indices (should start with 1) to variable
+ names, where the match group should match the value
+ existing in the context at the given variable name.
+ For example, {2:"thread_id"} indicates that the second
+ match group must match the value stored under the
+ context's previously stored "thread_id" key. This arg
+ only makes sense when regex is specified.
+ """
+ self._is_send_to_remote = is_send_to_remote
+ self.exact_payload = exact_payload
+ self.regex = regex
+ self.capture = capture
+ self.expect_captures = expect_captures
+
+ def is_send_to_remote(self):
+ return self._is_send_to_remote
+
+ def is_consumed(self):
+ # For now, all packets are consumed after first use.
+ return True
+
+ def get_send_packet(self):
+ if not self.is_send_to_remote():
+ raise Exception("get_send_packet() called on GdbRemoteEntry that is not a send-to-remote packet")
+ if not self.exact_payload:
+ raise Exception("get_send_packet() called on GdbRemoteEntry but it doesn't have an exact payload")
+ return self.exact_payload
+
+ def _assert_exact_payload_match(self, asserter, actual_packet):
+ assert_packets_equal(asserter, actual_packet, self.exact_payload)
+ return None
+
+ def _assert_regex_match(self, asserter, actual_packet, context):
+ # Ensure the actual packet matches from the start of the actual packet.
+ match = self.regex.match(actual_packet)
+ if not match:
+ asserter.fail("regex '{}' failed to match against content '{}'".format(self.regex.pattern, actual_packet))
+
+ if self.capture:
+ # Handle captures.
+ for group_index, var_name in list(self.capture.items()):
+ capture_text = match.group(group_index)
+ # It is okay for capture text to be None - which it will be if it is a group that can match nothing.
+ # The user must be okay with it since the regex itself matched above.
+ context[var_name] = capture_text
+
+ if self.expect_captures:
+ # Handle comparing matched groups to context dictionary entries.
+ for group_index, var_name in list(self.expect_captures.items()):
+ capture_text = match.group(group_index)
+ if not capture_text:
+ raise Exception("No content to expect for group index {}".format(group_index))
+ asserter.assertEqual(capture_text, context[var_name])
+
+ return context
+
+ def assert_match(self, asserter, actual_packet, context=None):
+ # This only makes sense for matching lines coming from the
+ # remote debug monitor.
+ if self.is_send_to_remote():
+ raise Exception("Attempted to match a packet being sent to the remote debug monitor, doesn't make sense.")
+
+ # Create a new context if needed.
+ if not context:
+ context = {}
+
+ # If this is an exact payload, ensure they match exactly,
+ # ignoring the packet checksum which is optional for no-ack
+ # mode.
+ if self.exact_payload:
+ self._assert_exact_payload_match(asserter, actual_packet)
+ return context
+ elif self.regex:
+ return self._assert_regex_match(asserter, actual_packet, context)
+ else:
+ raise Exception("Don't know how to match a remote-sent packet when exact_payload isn't specified.")
+
+class MultiResponseGdbRemoteEntry(GdbRemoteEntryBase):
+ """Represents a query/response style packet.
+
+ Assumes the first item is sent to the gdb remote.
+ An end sequence regex indicates the end of the query/response
+ packet sequence. All responses up through (but not including) the
+ end response are stored in a context variable.
+
+ Settings accepted from params:
+
+ next_query or query: required. The typical query packet without the $ prefix or #xx suffix.
+ If there is a special first packet to start the iteration query, see the
+ first_query key.
+
+ first_query: optional. If the first query requires a special query command, specify
+ it with this key. Do not specify the $ prefix or #xx suffix.
+
+ append_iteration_suffix: defaults to False. Specify True if the 0-based iteration
+ index should be appended as a suffix to the command. e.g. qRegisterInfo with
+ this key set true will generate query packets of qRegisterInfo0, qRegisterInfo1,
+ etc.
+
+ end_regex: required. Specifies a compiled regex object that will match the full text
+ of any response that signals an end to the iteration. It must include the
+ initial $ and ending #xx and must match the whole packet.
+
+ save_key: required. Specifies the key within the context where an array will be stored.
+ Each packet received from the gdb remote that does not match the end_regex will get
+ appended to the array stored within the context at that key.
+
+ runaway_response_count: optional. Defaults to 10000. If this many responses are retrieved,
+ assume there is something wrong with either the response collection or the ending
+ detection regex and throw an exception.
+ """
+ def __init__(self, params):
+ self._next_query = params.get("next_query", params.get("query"))
+ if not self._next_query:
+ raise "either next_query or query key must be specified for MultiResponseGdbRemoteEntry"
+
+ self._first_query = params.get("first_query", self._next_query)
+ self._append_iteration_suffix = params.get("append_iteration_suffix", False)
+ self._iteration = 0
+ self._end_regex = params["end_regex"]
+ self._save_key = params["save_key"]
+ self._runaway_response_count = params.get("runaway_response_count", 10000)
+ self._is_send_to_remote = True
+ self._end_matched = False
+
+ def is_send_to_remote(self):
+ return self._is_send_to_remote
+
+ def get_send_packet(self):
+ if not self.is_send_to_remote():
+ raise Exception("get_send_packet() called on MultiResponseGdbRemoteEntry that is not in the send state")
+ if self._end_matched:
+ raise Exception("get_send_packet() called on MultiResponseGdbRemoteEntry but end of query/response sequence has already been seen.")
+
+ # Choose the first or next query for the base payload.
+ if self._iteration == 0 and self._first_query:
+ payload = self._first_query
+ else:
+ payload = self._next_query
+
+ # Append the suffix as needed.
+ if self._append_iteration_suffix:
+ payload += "%x" % self._iteration
+
+ # Keep track of the iteration.
+ self._iteration += 1
+
+ # Now that we've given the query packet, flip the mode to receive/match.
+ self._is_send_to_remote = False
+
+ # Return the result, converted to packet form.
+ return gdbremote_packet_encode_string(payload)
+
+ def is_consumed(self):
+ return self._end_matched
+
+ def assert_match(self, asserter, actual_packet, context=None):
+ # This only makes sense for matching lines coming from the remote debug monitor.
+ if self.is_send_to_remote():
+ raise Exception("assert_match() called on MultiResponseGdbRemoteEntry but state is set to send a query packet.")
+
+ if self._end_matched:
+ raise Exception("assert_match() called on MultiResponseGdbRemoteEntry but end of query/response sequence has already been seen.")
+
+ # Set up a context as needed.
+ if not context:
+ context = {}
+
+ # Check if the packet matches the end condition.
+ match = self._end_regex.match(actual_packet)
+ if match:
+ # We're done iterating.
+ self._end_matched = True
+ return context
+
+ # Not done iterating - save the packet.
+ context[self._save_key] = context.get(self._save_key, [])
+ context[self._save_key].append(actual_packet)
+
+ # Check for a runaway response cycle.
+ if len(context[self._save_key]) >= self._runaway_response_count:
+ raise Exception("runaway query/response cycle detected: %d responses captured so far. Last response: %s" %
+ (len(context[self._save_key]), context[self._save_key][-1]))
+
+ # Flip the mode to send for generating the query.
+ self._is_send_to_remote = True
+ return context
+
+class MatchRemoteOutputEntry(GdbRemoteEntryBase):
+ """Waits for output from the debug monitor to match a regex or time out.
+
+ This entry type tries to match each time new gdb remote output is accumulated
+ using a provided regex. If the output does not match the regex within the
+ given timeframe, the command fails the playback session. If the regex does
+ match, any capture fields are recorded in the context.
+
+ Settings accepted from params:
+
+ regex: required. Specifies a compiled regex object that must either succeed
+ with re.match or re.search (see regex_mode below) within the given timeout
+ (see timeout_seconds below) or cause the playback to fail.
+
+ regex_mode: optional. Available values: "match" or "search". If "match", the entire
+ stub output as collected so far must match the regex. If search, then the regex
+ must match starting somewhere within the output text accumulated thus far.
+ Default: "match" (i.e. the regex must match the entirety of the accumulated output
+ buffer, so unexpected text will generally fail the match).
+
+ capture: optional. If specified, is a dictionary of regex match group indices (should start
+ with 1) to variable names that will store the capture group indicated by the
+ index. For example, {1:"thread_id"} will store capture group 1's content in the
+ context dictionary where "thread_id" is the key and the match group value is
+ the value. The value stored off can be used later in a expect_captures expression.
+ This arg only makes sense when regex is specified.
+ """
+ def __init__(self, regex=None, regex_mode="match", capture=None):
+ self._regex = regex
+ self._regex_mode = regex_mode
+ self._capture = capture
+ self._matched = False
+
+ if not self._regex:
+ raise Exception("regex cannot be None")
+
+ if not self._regex_mode in ["match", "search"]:
+ raise Exception("unsupported regex mode \"{}\": must be \"match\" or \"search\"".format(self._regex_mode))
+
+ def is_output_matcher(self):
+ return True
+
+ def is_send_to_remote(self):
+ # This is always a "wait for remote" command.
+ return False
+
+ def is_consumed(self):
+ return self._matched
+
+ def assert_match(self, asserter, accumulated_output, context):
+ # Validate args.
+ if not accumulated_output:
+ raise Exception("accumulated_output cannot be none")
+ if not context:
+ raise Exception("context cannot be none")
+
+ # Validate that we haven't already matched.
+ if self._matched:
+ raise Exception("invalid state - already matched, attempting to match again")
+
+ # If we don't have any content yet, we don't match.
+ if len(accumulated_output) < 1:
+ return context
+
+ # Check if we match
+ if self._regex_mode == "match":
+ match = self._regex.match(accumulated_output)
+ elif self._regex_mode == "search":
+ match = self._regex.search(accumulated_output)
+ else:
+ raise Exception("Unexpected regex mode: {}".format(self._regex_mode))
+
+ # If we don't match, wait to try again after next $O content, or time out.
+ if not match:
+ # print("re pattern \"{}\" did not match against \"{}\"".format(self._regex.pattern, accumulated_output))
+ return context
+
+ # We do match.
+ self._matched = True
+ # print("re pattern \"{}\" matched against \"{}\"".format(self._regex.pattern, accumulated_output))
+
+ # Collect up any captures into the context.
+ if self._capture:
+ # Handle captures.
+ for group_index, var_name in list(self._capture.items()):
+ capture_text = match.group(group_index)
+ if not capture_text:
+ raise Exception("No content for group index {}".format(group_index))
+ context[var_name] = capture_text
+
+ return context
+
+
+class GdbRemoteTestSequence(object):
+
+ _LOG_LINE_REGEX = re.compile(r'^.*(read|send)\s+packet:\s+(.+)$')
+
+ def __init__(self, logger):
+ self.entries = []
+ self.logger = logger
+
+ def add_log_lines(self, log_lines, remote_input_is_read):
+ for line in log_lines:
+ if type(line) == str:
+ # Handle log line import
+ # if self.logger:
+ # self.logger.debug("processing log line: {}".format(line))
+ match = self._LOG_LINE_REGEX.match(line)
+ if match:
+ playback_packet = match.group(2)
+ direction = match.group(1)
+ if _is_packet_lldb_gdbserver_input(direction, remote_input_is_read):
+ # Handle as something to send to the remote debug monitor.
+ # if self.logger:
+ # self.logger.info("processed packet to send to remote: {}".format(playback_packet))
+ self.entries.append(GdbRemoteEntry(is_send_to_remote=True, exact_payload=playback_packet))
+ else:
+ # Log line represents content to be expected from the remote debug monitor.
+ # if self.logger:
+ # self.logger.info("receiving packet from llgs, should match: {}".format(playback_packet))
+ self.entries.append(GdbRemoteEntry(is_send_to_remote=False,exact_payload=playback_packet))
+ else:
+ raise Exception("failed to interpret log line: {}".format(line))
+ elif type(line) == dict:
+ entry_type = line.get("type", "regex_capture")
+ if entry_type == "regex_capture":
+ # Handle more explicit control over details via dictionary.
+ direction = line.get("direction", None)
+ regex = line.get("regex", None)
+ capture = line.get("capture", None)
+ expect_captures = line.get("expect_captures", None)
+
+ # Compile the regex.
+ if regex and (type(regex) == str):
+ regex = re.compile(regex)
+
+ if _is_packet_lldb_gdbserver_input(direction, remote_input_is_read):
+ # Handle as something to send to the remote debug monitor.
+ # if self.logger:
+ # self.logger.info("processed dict sequence to send to remote")
+ self.entries.append(GdbRemoteEntry(is_send_to_remote=True, regex=regex, capture=capture, expect_captures=expect_captures))
+ else:
+ # Log line represents content to be expected from the remote debug monitor.
+ # if self.logger:
+ # self.logger.info("processed dict sequence to match receiving from remote")
+ self.entries.append(GdbRemoteEntry(is_send_to_remote=False, regex=regex, capture=capture, expect_captures=expect_captures))
+ elif entry_type == "multi_response":
+ self.entries.append(MultiResponseGdbRemoteEntry(line))
+ elif entry_type == "output_match":
+
+ regex = line.get("regex", None)
+ # Compile the regex.
+ if regex and (type(regex) == str):
+ regex = re.compile(regex)
+
+ regex_mode = line.get("regex_mode", "match")
+ capture = line.get("capture", None)
+ self.entries.append(MatchRemoteOutputEntry(regex=regex, regex_mode=regex_mode, capture=capture))
+ else:
+ raise Exception("unknown entry type \"%s\"" % entry_type)
+
+def process_is_running(pid, unknown_value=True):
+ """If possible, validate that the given pid represents a running process on the local system.
+
+ Args:
+
+ pid: an OS-specific representation of a process id. Should be an integral value.
+
+ unknown_value: value used when we cannot determine how to check running local
+ processes on the OS.
+
+ Returns:
+
+ If we can figure out how to check running process ids on the given OS:
+ return True if the process is running, or False otherwise.
+
+ If we don't know how to check running process ids on the given OS:
+ return the value provided by the unknown_value arg.
+ """
+ if not isinstance(pid, six.integer_types):
+ raise Exception("pid must be an integral type (actual type: %s)" % str(type(pid)))
+
+ process_ids = []
+
+ if lldb.remote_platform:
+ # Don't know how to get list of running process IDs on a remote
+ # platform
+ return unknown_value
+ elif platform.system() in ['Darwin', 'Linux', 'FreeBSD', 'NetBSD']:
+ # Build the list of running process ids
+ output = subprocess.check_output("ps ax | awk '{ print $1; }'", shell=True)
+ text_process_ids = output.split('\n')[1:]
+ # Convert text pids to ints
+ process_ids = [int(text_pid) for text_pid in text_process_ids if text_pid != '']
+ # elif {your_platform_here}:
+ # fill in process_ids as a list of int type process IDs running on
+ # the local system.
+ else:
+ # Don't know how to get list of running process IDs on this
+ # OS, so return the "don't know" value.
+ return unknown_value
+
+ # Check if the pid is in the process_ids
+ return pid in process_ids
+
+if __name__ == '__main__':
+ EXE_PATH = get_lldb_server_exe()
+ if EXE_PATH:
+ print("lldb-server path detected: {}".format(EXE_PATH))
+ else:
+ print("lldb-server could not be found")