aboutsummaryrefslogtreecommitdiff
path: root/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
diff options
context:
space:
mode:
Diffstat (limited to 'packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py')
-rw-r--r--packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py685
1 files changed, 464 insertions, 221 deletions
diff --git a/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py b/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
index d63ddbe39998..ef6a0a21aaeb 100644
--- a/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
+++ b/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
@@ -5,7 +5,6 @@ Base class for gdb-remote test cases.
from __future__ import print_function
-
import errno
import os
import os.path
@@ -24,9 +23,11 @@ from lldbsuite.test.lldbtest import *
from lldbgdbserverutils import *
import logging
+
class _ConnectionRefused(IOError):
pass
+
class GdbRemoteTestCaseBase(TestBase):
NO_DEBUG_INFO_TESTCASE = True
@@ -35,29 +36,34 @@ class GdbRemoteTestCaseBase(TestBase):
_GDBREMOTE_KILL_PACKET = "$k#6b"
- # Start the inferior separately, attach to the inferior on the stub command line.
+ # Start the inferior separately, attach to the inferior on the stub
+ # command line.
_STARTUP_ATTACH = "attach"
- # Start the inferior separately, start the stub without attaching, allow the test to attach to the inferior however it wants (e.g. $vAttach;pid).
+ # Start the inferior separately, start the stub without attaching, allow
+ # the test to attach to the inferior however it wants (e.g. $vAttach;pid).
_STARTUP_ATTACH_MANUALLY = "attach_manually"
- # Start the stub, and launch the inferior with an $A packet via the initial packet stream.
+ # Start the stub, and launch the inferior with an $A packet via the
+ # initial packet stream.
_STARTUP_LAUNCH = "launch"
- # GDB Signal numbers that are not target-specific used for common exceptions
- TARGET_EXC_BAD_ACCESS = 0x91
+ # GDB Signal numbers that are not target-specific used for common
+ # exceptions
+ TARGET_EXC_BAD_ACCESS = 0x91
TARGET_EXC_BAD_INSTRUCTION = 0x92
- TARGET_EXC_ARITHMETIC = 0x93
- TARGET_EXC_EMULATION = 0x94
- TARGET_EXC_SOFTWARE = 0x95
- TARGET_EXC_BREAKPOINT = 0x96
+ TARGET_EXC_ARITHMETIC = 0x93
+ TARGET_EXC_EMULATION = 0x94
+ TARGET_EXC_SOFTWARE = 0x95
+ TARGET_EXC_BREAKPOINT = 0x96
_verbose_log_handler = None
- _log_formatter = logging.Formatter(fmt='%(asctime)-15s %(levelname)-8s %(message)s')
+ _log_formatter = logging.Formatter(
+ fmt='%(asctime)-15s %(levelname)-8s %(message)s')
def setUpBaseLogging(self):
self.logger = logging.getLogger(__name__)
if len(self.logger.handlers) > 0:
- return # We have set up this handler already
+ return # We have set up this handler already
self.logger.propagate = False
self.logger.setLevel(logging.DEBUG)
@@ -68,11 +74,11 @@ class GdbRemoteTestCaseBase(TestBase):
handler.setFormatter(self._log_formatter)
self.logger.addHandler(handler)
-
def isVerboseLoggingRequested(self):
# We will report our detailed logs if the user requested that the "gdb-remote" channel is
# logged.
- return any(("gdb-remote" in channel) for channel in lldbtest_config.channels)
+ return any(("gdb-remote" in channel)
+ for channel in lldbtest_config.channels)
def setUp(self):
TestBase.setUp(self)
@@ -83,7 +89,8 @@ class GdbRemoteTestCaseBase(TestBase):
if self.isVerboseLoggingRequested():
# If requested, full logs go to a log file
- self._verbose_log_handler = logging.FileHandler(self.log_basename + "-host.log")
+ self._verbose_log_handler = logging.FileHandler(
+ self.log_basename + "-host.log")
self._verbose_log_handler.setFormatter(self._log_formatter)
self._verbose_log_handler.setLevel(logging.DEBUG)
self.logger.addHandler(self._verbose_log_handler)
@@ -100,7 +107,8 @@ class GdbRemoteTestCaseBase(TestBase):
url_pattern = '(.+)://\[?(.+?)\]?/.*'
else:
url_pattern = '(.+)://(.+):\d+'
- scheme, host = re.match(url_pattern, configuration.lldb_platform_url).groups()
+ scheme, host = re.match(
+ url_pattern, configuration.lldb_platform_url).groups()
if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
self.stub_device = host
self.stub_hostname = 'localhost'
@@ -122,21 +130,24 @@ class GdbRemoteTestCaseBase(TestBase):
def setUpServerLogging(self, is_llgs):
if len(lldbtest_config.channels) == 0:
- return # No logging requested
+ return # No logging requested
if lldb.remote_platform:
- log_file = lldbutil.join_remote_paths(lldb.remote_platform.GetWorkingDirectory(), "server.log")
+ log_file = lldbutil.join_remote_paths(
+ lldb.remote_platform.GetWorkingDirectory(), "server.log")
else:
log_file = self.getLocalServerLogFile()
if is_llgs:
self.debug_monitor_extra_args.append("--log-file=" + log_file)
- self.debug_monitor_extra_args.append("--log-channels={}".format(":".join(lldbtest_config.channels)))
+ self.debug_monitor_extra_args.append(
+ "--log-channels={}".format(":".join(lldbtest_config.channels)))
else:
- self.debug_monitor_extra_args = ["--log-file=" + self.log_file, "--log-flags=0x800000"]
+ self.debug_monitor_extra_args = [
+ "--log-file=" + log_file, "--log-flags=0x800000"]
def get_next_port(self):
- return 12000 + random.randint(0,3999)
+ return 12000 + random.randint(0, 3999)
def reset_test_sequence(self):
self.test_sequence = GdbRemoteTestSequence(self.logger)
@@ -149,7 +160,8 @@ class GdbRemoteTestCaseBase(TestBase):
# Create the named pipe.
os.mkfifo(named_pipe_path)
- # Open the read side of the pipe in non-blocking mode. This will return right away, ready or not.
+ # Open the read side of the pipe in non-blocking mode. This will
+ # return right away, ready or not.
named_pipe_fd = os.open(named_pipe_path, os.O_RDONLY | os.O_NONBLOCK)
# Create the file for the named pipe. Note this will follow semantics of
@@ -177,7 +189,9 @@ class GdbRemoteTestCaseBase(TestBase):
try:
os.rmdir(temp_dir)
except:
- print("failed to delete temp dir: {}, directory contents: '{}'".format(temp_dir, os.listdir(temp_dir)))
+ print(
+ "failed to delete temp dir: {}, directory contents: '{}'".format(
+ temp_dir, os.listdir(temp_dir)))
None
# Add the shutdown hook to clean up the named pipe.
@@ -190,14 +204,23 @@ class GdbRemoteTestCaseBase(TestBase):
def get_stub_port_from_named_socket(self, read_timeout_seconds=5):
# Wait for something to read with a max timeout.
- (ready_readers, _, _) = select.select([self.named_pipe_fd], [], [], read_timeout_seconds)
- self.assertIsNotNone(ready_readers, "write side of pipe has not written anything - stub isn't writing to pipe.")
- self.assertNotEqual(len(ready_readers), 0, "write side of pipe has not written anything - stub isn't writing to pipe.")
+ (ready_readers, _, _) = select.select(
+ [self.named_pipe_fd], [], [], read_timeout_seconds)
+ self.assertIsNotNone(
+ ready_readers,
+ "write side of pipe has not written anything - stub isn't writing to pipe.")
+ self.assertNotEqual(
+ len(ready_readers),
+ 0,
+ "write side of pipe has not written anything - stub isn't writing to pipe.")
# Read the port from the named pipe.
stub_port_raw = self.named_pipe.read()
self.assertIsNotNone(stub_port_raw)
- self.assertNotEqual(len(stub_port_raw), 0, "no content to read on pipe")
+ self.assertNotEqual(
+ len(stub_port_raw),
+ 0,
+ "no content to read on pipe")
# Trim null byte, convert to int.
stub_port_raw = stub_port_raw[:-1]
@@ -212,15 +235,24 @@ class GdbRemoteTestCaseBase(TestBase):
use_named_pipe = False
# Grab the ppid from /proc/[shell pid]/stat
- err, retcode, shell_stat = self.run_platform_command("cat /proc/$$/stat")
- self.assertTrue(err.Success() and retcode == 0,
- "Failed to read file /proc/$$/stat: %s, retcode: %d" % (err.GetCString(), retcode))
+ err, retcode, shell_stat = self.run_platform_command(
+ "cat /proc/$$/stat")
+ self.assertTrue(
+ err.Success() and retcode == 0,
+ "Failed to read file /proc/$$/stat: %s, retcode: %d" %
+ (err.GetCString(),
+ retcode))
# [pid] ([executable]) [state] [*ppid*]
pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
- err, retcode, ls_output = self.run_platform_command("ls -l /proc/%s/exe" % pid)
- self.assertTrue(err.Success() and retcode == 0,
- "Failed to read file /proc/%s/exe: %s, retcode: %d" % (pid, err.GetCString(), retcode))
+ err, retcode, ls_output = self.run_platform_command(
+ "ls -l /proc/%s/exe" % pid)
+ self.assertTrue(
+ err.Success() and retcode == 0,
+ "Failed to read file /proc/%s/exe: %s, retcode: %d" %
+ (pid,
+ err.GetCString(),
+ retcode))
exe = ls_output.split()[-1]
# If the binary has been deleted, the link name has " (deleted)" appended.
@@ -235,7 +267,8 @@ class GdbRemoteTestCaseBase(TestBase):
self.setUpServerLogging(is_llgs=True)
if use_named_pipe:
- (self.named_pipe_path, self.named_pipe, self.named_pipe_fd) = self.create_named_pipe()
+ (self.named_pipe_path, self.named_pipe,
+ self.named_pipe_fd) = self.create_named_pipe()
def init_debugserver_test(self, use_named_pipe=True):
self.debug_monitor_exe = get_debugserver_exe()
@@ -243,17 +276,19 @@ class GdbRemoteTestCaseBase(TestBase):
self.skipTest("debugserver exe not found")
self.setUpServerLogging(is_llgs=False)
if use_named_pipe:
- (self.named_pipe_path, self.named_pipe, self.named_pipe_fd) = self.create_named_pipe()
+ (self.named_pipe_path, self.named_pipe,
+ self.named_pipe_fd) = self.create_named_pipe()
# The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
# when the process truly dies.
self.stub_sends_two_stop_notifications_on_kill = True
def forward_adb_port(self, source, target, direction, device):
- adb = [ 'adb' ] + ([ '-s', device ] if device else []) + [ direction ]
+ adb = ['adb'] + (['-s', device] if device else []) + [direction]
+
def remove_port_forward():
- subprocess.call(adb + [ "--remove", "tcp:%d" % source])
+ subprocess.call(adb + ["--remove", "tcp:%d" % source])
- subprocess.call(adb + [ "tcp:%d" % source, "tcp:%d" % target])
+ subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target])
self.addTearDownHook(remove_port_forward)
def _verify_socket(self, sock):
@@ -265,12 +300,12 @@ class GdbRemoteTestCaseBase(TestBase):
# connection again.
triple = self.dbg.GetSelectedPlatform().GetTriple()
if not re.match(".*-.*-.*-android", triple):
- return # Not android.
+ return # Not android.
can_read, _, _ = select.select([sock], [], [], 0.1)
if sock not in can_read:
- return # Data is not available, but the connection is alive.
+ return # Data is not available, but the connection is alive.
if len(sock.recv(1, socket.MSG_PEEK)) == 0:
- raise _ConnectionRefused() # Got EOF, connection dropped.
+ raise _ConnectionRefused() # Got EOF, connection dropped.
def create_socket(self):
sock = socket.socket()
@@ -278,9 +313,16 @@ class GdbRemoteTestCaseBase(TestBase):
triple = self.dbg.GetSelectedPlatform().GetTriple()
if re.match(".*-.*-.*-android", triple):
- self.forward_adb_port(self.port, self.port, "forward", self.stub_device)
-
- logger.info("Connecting to debug monitor on %s:%d", self.stub_hostname, self.port)
+ self.forward_adb_port(
+ self.port,
+ self.port,
+ "forward",
+ self.stub_device)
+
+ logger.info(
+ "Connecting to debug monitor on %s:%d",
+ self.stub_hostname,
+ self.port)
connect_info = (self.stub_hostname, self.port)
try:
sock.connect(connect_info)
@@ -295,12 +337,16 @@ class GdbRemoteTestCaseBase(TestBase):
# send the kill packet so lldb-server shuts down gracefully
sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
except:
- logger.warning("failed to send kill packet to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
+ logger.warning(
+ "failed to send kill packet to debug monitor: {}; ignoring".format(
+ sys.exc_info()[0]))
try:
sock.close()
except:
- logger.warning("failed to close socket to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
+ logger.warning(
+ "failed to close socket to debug monitor: {}; ignoring".format(
+ sys.exc_info()[0]))
self.addTearDownHook(shutdown_socket)
@@ -319,9 +365,11 @@ class GdbRemoteTestCaseBase(TestBase):
def get_debug_monitor_command_line_args(self, attach_pid=None):
if lldb.remote_platform:
- commandline_args = self.debug_monitor_extra_args + ["*:{}".format(self.port)]
+ commandline_args = self.debug_monitor_extra_args + \
+ ["*:{}".format(self.port)]
else:
- commandline_args = self.debug_monitor_extra_args + ["localhost:{}".format(self.port)]
+ commandline_args = self.debug_monitor_extra_args + \
+ ["localhost:{}".format(self.port)]
if attach_pid:
commandline_args += ["--attach=%d" % attach_pid]
@@ -331,14 +379,19 @@ class GdbRemoteTestCaseBase(TestBase):
def launch_debug_monitor(self, attach_pid=None, logfile=None):
# Create the command line.
- commandline_args = self.get_debug_monitor_command_line_args(attach_pid=attach_pid)
+ commandline_args = self.get_debug_monitor_command_line_args(
+ attach_pid=attach_pid)
# Start the server.
- server = self.spawnSubprocess(self.debug_monitor_exe, commandline_args, install_remote=False)
+ server = self.spawnSubprocess(
+ self.debug_monitor_exe,
+ commandline_args,
+ install_remote=False)
self.addTearDownHook(self.cleanupSubprocesses)
self.assertIsNotNone(server)
- # If we're receiving the stub's listening port from the named pipe, do that here.
+ # If we're receiving the stub's listening port from the named pipe, do
+ # that here.
if self.named_pipe:
self.port = self.get_stub_port_from_named_socket()
@@ -354,7 +407,9 @@ class GdbRemoteTestCaseBase(TestBase):
try:
server.terminate()
except:
- logger.warning("failed to terminate server for debug monitor: {}; ignoring".format(sys.exc_info()[0]))
+ logger.warning(
+ "failed to terminate server for debug monitor: {}; ignoring".format(
+ sys.exc_info()[0]))
self.addTearDownHook(shutdown_debug_monitor)
# Schedule debug monitor to be shut down during teardown.
@@ -374,11 +429,14 @@ class GdbRemoteTestCaseBase(TestBase):
# Schedule debug monitor to be shut down during teardown.
logger = self.logger
+
def shutdown_debug_monitor():
try:
server.terminate()
except:
- logger.warning("failed to terminate server for debug monitor: {}; ignoring".format(sys.exc_info()[0]))
+ logger.warning(
+ "failed to terminate server for debug monitor: {}; ignoring".format(
+ sys.exc_info()[0]))
self.addTearDownHook(shutdown_debug_monitor)
connect_attemps = 0
@@ -387,7 +445,7 @@ class GdbRemoteTestCaseBase(TestBase):
while connect_attemps < MAX_CONNECT_ATTEMPTS:
# Create a socket to talk to the server
try:
- logger.info("Connect attempt %d", connect_attemps+1)
+ logger.info("Connect attempt %d", connect_attemps + 1)
self.sock = self.create_socket()
return server
except _ConnectionRefused as serr:
@@ -400,18 +458,27 @@ class GdbRemoteTestCaseBase(TestBase):
server.terminate()
# Increment attempts.
- print("connect to debug monitor on port %d failed, attempt #%d of %d" % (self.port, attempts + 1, MAX_ATTEMPTS))
+ print(
+ "connect to debug monitor on port %d failed, attempt #%d of %d" %
+ (self.port, attempts + 1, MAX_ATTEMPTS))
attempts += 1
- # And wait a random length of time before next attempt, to avoid collisions.
- time.sleep(random.randint(1,5))
-
+ # And wait a random length of time before next attempt, to avoid
+ # collisions.
+ time.sleep(random.randint(1, 5))
+
# Now grab a new port number.
self.port = self.get_next_port()
- raise Exception("failed to create a socket to the launched debug monitor after %d tries" % attempts)
+ raise Exception(
+ "failed to create a socket to the launched debug monitor after %d tries" %
+ attempts)
- def launch_process_for_attach(self, inferior_args=None, sleep_seconds=3, exe_path=None):
+ def launch_process_for_attach(
+ self,
+ inferior_args=None,
+ sleep_seconds=3,
+ exe_path=None):
# We're going to start a child process that the debug monitor stub can later attach to.
# This process needs to be started so that it just hangs around for a while. We'll
# have it sleep.
@@ -425,15 +492,22 @@ class GdbRemoteTestCaseBase(TestBase):
args.append("sleep:%d" % sleep_seconds)
inferior = self.spawnSubprocess(exe_path, args)
+
def shutdown_process_for_attach():
try:
inferior.terminate()
except:
- logger.warning("failed to terminate inferior process for attach: {}; ignoring".format(sys.exc_info()[0]))
+ logger.warning(
+ "failed to terminate inferior process for attach: {}; ignoring".format(
+ sys.exc_info()[0]))
self.addTearDownHook(shutdown_process_for_attach)
return inferior
- def prep_debug_monitor_and_inferior(self, inferior_args=None, inferior_sleep_seconds=3, inferior_exe_path=None):
+ def prep_debug_monitor_and_inferior(
+ self,
+ inferior_args=None,
+ inferior_sleep_seconds=3,
+ inferior_exe_path=None):
"""Prep the debug monitor, the inferior, and the expected packet stream.
Handle the separate cases of using the debug monitor in attach-to-inferior mode
@@ -458,11 +532,15 @@ class GdbRemoteTestCaseBase(TestBase):
if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
# Launch the process that we'll use as the inferior.
- inferior = self.launch_process_for_attach(inferior_args=inferior_args, sleep_seconds=inferior_sleep_seconds, exe_path=inferior_exe_path)
+ inferior = self.launch_process_for_attach(
+ inferior_args=inferior_args,
+ sleep_seconds=inferior_sleep_seconds,
+ exe_path=inferior_exe_path)
self.assertIsNotNone(inferior)
self.assertTrue(inferior.pid > 0)
if self._inferior_startup == self._STARTUP_ATTACH:
- # In this case, we want the stub to attach via the command line, so set the command line attach pid here.
+ # In this case, we want the stub to attach via the command
+ # line, so set the command line attach pid here.
attach_pid = inferior.pid
if self._inferior_startup == self._STARTUP_LAUNCH:
@@ -471,11 +549,15 @@ class GdbRemoteTestCaseBase(TestBase):
inferior_exe_path = os.path.abspath("a.out")
if lldb.remote_platform:
- remote_path = lldbutil.append_to_process_working_directory(os.path.basename(inferior_exe_path))
+ remote_path = lldbutil.append_to_process_working_directory(
+ os.path.basename(inferior_exe_path))
remote_file_spec = lldb.SBFileSpec(remote_path, False)
- err = lldb.remote_platform.Install(lldb.SBFileSpec(inferior_exe_path, True), remote_file_spec)
+ err = lldb.remote_platform.Install(lldb.SBFileSpec(
+ inferior_exe_path, True), remote_file_spec)
if err.Fail():
- raise Exception("remote_platform.Install('%s', '%s') failed: %s" % (inferior_exe_path, remote_path, err))
+ raise Exception(
+ "remote_platform.Install('%s', '%s') failed: %s" %
+ (inferior_exe_path, remote_path, err))
inferior_exe_path = remote_path
launch_args = [inferior_exe_path]
@@ -491,13 +573,18 @@ class GdbRemoteTestCaseBase(TestBase):
if self._inferior_startup == self._STARTUP_LAUNCH:
self.add_verified_launch_packets(launch_args)
- return {"inferior":inferior, "server":server}
+ return {"inferior": inferior, "server": server}
- def expect_socket_recv(self, sock, expected_content_regex, timeout_seconds):
+ def expect_socket_recv(
+ self,
+ sock,
+ expected_content_regex,
+ timeout_seconds):
response = ""
timeout_time = time.time() + timeout_seconds
- while not expected_content_regex.match(response) and time.time() < timeout_time:
+ while not expected_content_regex.match(
+ response) and time.time() < timeout_time:
can_read, _, _ = select.select([sock], [], [], timeout_seconds)
if can_read and sock in can_read:
recv_bytes = sock.recv(4096)
@@ -514,7 +601,8 @@ class GdbRemoteTestCaseBase(TestBase):
_, can_write, _ = select.select([], [sock], [], timeout_seconds)
if can_write and sock in can_write:
written_byte_count = sock.send(request_bytes_remaining)
- request_bytes_remaining = request_bytes_remaining[written_byte_count:]
+ request_bytes_remaining = request_bytes_remaining[
+ written_byte_count:]
self.assertEqual(len(request_bytes_remaining), 0)
def do_handshake(self, stub_socket, timeout_seconds=5):
@@ -527,7 +615,8 @@ class GdbRemoteTestCaseBase(TestBase):
self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST))
# Receive the ack and "OK"
- self.expect_socket_recv(stub_socket, re.compile(r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds)
+ self.expect_socket_recv(stub_socket, re.compile(
+ r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds)
# Send the final ack.
self.expect_socket_send(stub_socket, "+", timeout_seconds)
@@ -553,12 +642,12 @@ class GdbRemoteTestCaseBase(TestBase):
self.test_sequence.add_log_lines(
["read packet: $QThreadSuffixSupported#e4",
"send packet: $OK#00",
- ], True)
+ ], True)
def add_process_info_collection_packets(self):
self.test_sequence.add_log_lines(
["read packet: $qProcessInfo#dc",
- { "direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"process_info_raw"} }],
+ {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}],
True)
_KNOWN_PROCESS_INFO_KEYS = [
@@ -574,8 +663,9 @@ class GdbRemoteTestCaseBase(TestBase):
"triple",
"vendor",
"endian",
+ "elf_abi",
"ptrsize"
- ]
+ ]
def parse_process_info_response(self, context):
# Ensure we have a process info response.
@@ -584,7 +674,9 @@ class GdbRemoteTestCaseBase(TestBase):
self.assertIsNotNone(process_info_raw)
# Pull out key:value; pairs.
- process_info_dict = { match.group(1):match.group(2) for match in re.finditer(r"([^:]+):([^;]+);", process_info_raw) }
+ process_info_dict = {
+ match.group(1): match.group(2) for match in re.finditer(
+ r"([^:]+):([^;]+);", process_info_raw)}
# Validate keys are known.
for (key, val) in list(process_info_dict.items()):
@@ -595,9 +687,9 @@ class GdbRemoteTestCaseBase(TestBase):
def add_register_info_collection_packets(self):
self.test_sequence.add_log_lines(
- [ { "type":"multi_response", "query":"qRegisterInfo", "append_iteration_suffix":True,
- "end_regex":re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
- "save_key":"reg_info_responses" } ],
+ [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True,
+ "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
+ "save_key": "reg_info_responses"}],
True)
def parse_register_info_packets(self, context):
@@ -606,13 +698,19 @@ class GdbRemoteTestCaseBase(TestBase):
self.assertIsNotNone(reg_info_responses)
# Parse register infos.
- return [parse_reg_info_response(reg_info_response) for reg_info_response in reg_info_responses]
+ return [parse_reg_info_response(reg_info_response)
+ for reg_info_response in reg_info_responses]
def expect_gdbremote_sequence(self, timeout_seconds=None):
if not timeout_seconds:
timeout_seconds = self._TIMEOUT_SECONDS
- return expect_lldb_gdbserver_replay(self, self.sock, self.test_sequence,
- self._pump_queues, timeout_seconds, self.logger)
+ return expect_lldb_gdbserver_replay(
+ self,
+ self.sock,
+ self.test_sequence,
+ self._pump_queues,
+ timeout_seconds,
+ self.logger)
_KNOWN_REGINFO_KEYS = [
"name",
@@ -627,7 +725,9 @@ class GdbRemoteTestCaseBase(TestBase):
"dwarf",
"generic",
"container-regs",
- "invalidate-regs"
+ "invalidate-regs",
+ "dynamic_size_dwarf_expr_bytes",
+ "dynamic_size_dwarf_len"
]
def assert_valid_reg_info(self, reg_info):
@@ -667,7 +767,7 @@ class GdbRemoteTestCaseBase(TestBase):
def add_query_memory_region_packets(self, address):
self.test_sequence.add_log_lines(
["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
- {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"memory_region_response"} }],
+ {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}],
True)
def parse_key_val_dict(self, key_val_text, allow_dupes=True):
@@ -678,13 +778,15 @@ class GdbRemoteTestCaseBase(TestBase):
val = match.group(2)
if key in kv_dict:
if allow_dupes:
- if type(kv_dict[key]) == list:
+ if isinstance(kv_dict[key], list):
kv_dict[key].append(val)
else:
# Promote to list
kv_dict[key] = [kv_dict[key], val]
else:
- self.fail("key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(key, val, key_val_text, kv_dict))
+ self.fail(
+ "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(
+ key, val, key_val_text, kv_dict))
else:
kv_dict[key] = val
return kv_dict
@@ -694,17 +796,25 @@ class GdbRemoteTestCaseBase(TestBase):
self.assertIsNotNone(context.get("memory_region_response"))
# Pull out key:value; pairs.
- mem_region_dict = self.parse_key_val_dict(context.get("memory_region_response"))
+ mem_region_dict = self.parse_key_val_dict(
+ context.get("memory_region_response"))
# Validate keys are known.
for (key, val) in list(mem_region_dict.items()):
- self.assertTrue(key in ["start", "size", "permissions", "error"])
+ self.assertTrue(
+ key in [
+ "start",
+ "size",
+ "permissions",
+ "name",
+ "error"])
self.assertIsNotNone(val)
# Return the dictionary of key-value pairs for the memory region.
return mem_region_dict
- def assert_address_within_memory_region(self, test_address, mem_region_dict):
+ def assert_address_within_memory_region(
+ self, test_address, mem_region_dict):
self.assertIsNotNone(mem_region_dict)
self.assertTrue("start" in mem_region_dict)
self.assertTrue("size" in mem_region_dict)
@@ -714,15 +824,25 @@ class GdbRemoteTestCaseBase(TestBase):
range_end = range_start + range_size
if test_address < range_start:
- self.fail("address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
+ self.fail(
+ "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
+ test_address,
+ range_start,
+ range_end,
+ range_size))
elif test_address >= range_end:
- self.fail("address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
+ self.fail(
+ "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
+ test_address,
+ range_start,
+ range_end,
+ range_size))
def add_threadinfo_collection_packets(self):
self.test_sequence.add_log_lines(
- [ { "type":"multi_response", "first_query":"qfThreadInfo", "next_query":"qsThreadInfo",
- "append_iteration_suffix":False, "end_regex":re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
- "save_key":"threadinfo_responses" } ],
+ [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo",
+ "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
+ "save_key": "threadinfo_responses"}],
True)
def parse_threadinfo_packets(self, context):
@@ -760,35 +880,44 @@ class GdbRemoteTestCaseBase(TestBase):
return threads
- def add_set_breakpoint_packets(self, address, do_continue=True, breakpoint_kind=1):
+ def add_set_breakpoint_packets(
+ self,
+ address,
+ do_continue=True,
+ breakpoint_kind=1):
self.test_sequence.add_log_lines(
- [# Set the breakpoint.
- "read packet: $Z0,{0:x},{1}#00".format(address, breakpoint_kind),
- # Verify the stub could set it.
- "send packet: $OK#00",
- ], True)
+ [ # Set the breakpoint.
+ "read packet: $Z0,{0:x},{1}#00".format(
+ address, breakpoint_kind),
+ # Verify the stub could set it.
+ "send packet: $OK#00",
+ ], True)
if (do_continue):
self.test_sequence.add_log_lines(
- [# Continue the inferior.
- "read packet: $c#63",
- # Expect a breakpoint stop report.
- {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
- ], True)
+ [ # Continue the inferior.
+ "read packet: $c#63",
+ # Expect a breakpoint stop report.
+ {"direction": "send",
+ "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
+ "capture": {1: "stop_signo",
+ 2: "stop_thread_id"}},
+ ], True)
def add_remove_breakpoint_packets(self, address, breakpoint_kind=1):
self.test_sequence.add_log_lines(
- [# Remove the breakpoint.
- "read packet: $z0,{0:x},{1}#00".format(address, breakpoint_kind),
- # Verify the stub could unset it.
- "send packet: $OK#00",
+ [ # Remove the breakpoint.
+ "read packet: $z0,{0:x},{1}#00".format(
+ address, breakpoint_kind),
+ # Verify the stub could unset it.
+ "send packet: $OK#00",
], True)
def add_qSupported_packets(self):
self.test_sequence.add_log_lines(
["read packet: $qSupported#00",
- {"direction":"send", "regex":r"^\$(.*)#[0-9a-fA-F]{2}", "capture":{1: "qSupported_response"}},
- ], True)
+ {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}},
+ ], True)
_KNOWN_QSUPPORTED_STUB_FEATURES = [
"augmented-libraries-svr4-read",
@@ -821,23 +950,27 @@ class GdbRemoteTestCaseBase(TestBase):
supported_dict[key] = val
else:
if len(key) < 2:
- raise Exception("singular stub feature is too short: must be stub_feature{+,-,?}")
+ raise Exception(
+ "singular stub feature is too short: must be stub_feature{+,-,?}")
supported_type = key[-1]
key = key[:-1]
if not supported_type in ["+", "-", "?"]:
- raise Exception("malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
- supported_dict[key] = supported_type
+ raise Exception(
+ "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
+ supported_dict[key] = supported_type
# Ensure we know the supported element
- if not key in self._KNOWN_QSUPPORTED_STUB_FEATURES:
- raise Exception("unknown qSupported stub feature reported: %s" % key)
+ if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES:
+ raise Exception(
+ "unknown qSupported stub feature reported: %s" %
+ key)
return supported_dict
def run_process_then_stop(self, run_seconds=1):
# Tell the stub to continue.
self.test_sequence.add_log_lines(
- ["read packet: $vCont;c#a8"],
- True)
+ ["read packet: $vCont;c#a8"],
+ True)
context = self.expect_gdbremote_sequence()
# Wait for run_seconds.
@@ -847,7 +980,7 @@ class GdbRemoteTestCaseBase(TestBase):
self.reset_test_sequence()
self.test_sequence.add_log_lines(
["read packet: {}".format(chr(3)),
- {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result"} }],
+ {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}],
True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
@@ -857,18 +990,21 @@ class GdbRemoteTestCaseBase(TestBase):
def select_modifiable_register(self, reg_infos):
"""Find a register that can be read/written freely."""
- PREFERRED_REGISTER_NAMES = set(["rax",])
+ PREFERRED_REGISTER_NAMES = set(["rax", ])
- # First check for the first register from the preferred register name set.
+ # First check for the first register from the preferred register name
+ # set.
alternative_register_index = None
self.assertIsNotNone(reg_infos)
for reg_info in reg_infos:
- if ("name" in reg_info) and (reg_info["name"] in PREFERRED_REGISTER_NAMES):
+ if ("name" in reg_info) and (
+ reg_info["name"] in PREFERRED_REGISTER_NAMES):
# We found a preferred register. Use it.
return reg_info["lldb_register_index"]
if ("generic" in reg_info) and (reg_info["generic"] == "fp"):
- # A frame pointer register will do as a register to modify temporarily.
+ # A frame pointer register will do as a register to modify
+ # temporarily.
alternative_register_index = reg_info["lldb_register_index"]
# We didn't find a preferred register. Return whatever alternative register
@@ -901,7 +1037,8 @@ class GdbRemoteTestCaseBase(TestBase):
def find_generic_register_with_name(self, reg_infos, generic_name):
self.assertIsNotNone(reg_infos)
for reg_info in reg_infos:
- if ("generic" in reg_info) and (reg_info["generic"] == generic_name):
+ if ("generic" in reg_info) and (
+ reg_info["generic"] == generic_name):
return reg_info
return None
@@ -912,13 +1049,13 @@ class GdbRemoteTestCaseBase(TestBase):
if encoded_bytes[i] == "}":
# Handle escaped char.
self.assertTrue(i + 1 < len(encoded_bytes))
- decoded_bytes += chr(ord(encoded_bytes[i+1]) ^ 0x20)
- i +=2
+ decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20)
+ i += 2
elif encoded_bytes[i] == "*":
# Handle run length encoding.
self.assertTrue(len(decoded_bytes) > 0)
self.assertTrue(i + 1 < len(encoded_bytes))
- repeat_count = ord(encoded_bytes[i+1]) - 29
+ repeat_count = ord(encoded_bytes[i + 1]) - 29
decoded_bytes += decoded_bytes[-1] * repeat_count
i += 2
else:
@@ -955,7 +1092,8 @@ class GdbRemoteTestCaseBase(TestBase):
self.assertFalse(key in auxv_dict)
auxv_dict[key] = value
- self.fail("should not reach here - implies required double zero entry not found")
+ self.fail(
+ "should not reach here - implies required double zero entry not found")
return auxv_dict
def read_binary_data_in_chunks(self, command_prefix, chunk_length):
@@ -967,10 +1105,21 @@ class GdbRemoteTestCaseBase(TestBase):
while not done:
# Grab the next iteration of data.
self.reset_test_sequence()
- self.test_sequence.add_log_lines([
- "read packet: ${}{:x},{:x}:#00".format(command_prefix, offset, chunk_length),
- {"direction":"send", "regex":re.compile(r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", re.MULTILINE|re.DOTALL), "capture":{1:"response_type", 2:"content_raw"} }
- ], True)
+ self.test_sequence.add_log_lines(
+ [
+ "read packet: ${}{:x},{:x}:#00".format(
+ command_prefix,
+ offset,
+ chunk_length),
+ {
+ "direction": "send",
+ "regex": re.compile(
+ r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
+ re.MULTILINE | re.DOTALL),
+ "capture": {
+ 1: "response_type",
+ 2: "content_raw"}}],
+ True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
@@ -997,25 +1146,32 @@ class GdbRemoteTestCaseBase(TestBase):
# Send the intterupt.
"read packet: {}".format(chr(3)),
# And wait for the stop notification.
- {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", "capture":{1:"stop_signo", 2:"stop_key_val_text" } },
- ], True)
+ {"direction": "send",
+ "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
+ "capture": {1: "stop_signo",
+ 2: "stop_key_val_text"}},
+ ], True)
def parse_interrupt_packets(self, context):
self.assertIsNotNone(context.get("stop_signo"))
self.assertIsNotNone(context.get("stop_key_val_text"))
- return (int(context["stop_signo"], 16), self.parse_key_val_dict(context["stop_key_val_text"]))
+ return (int(context["stop_signo"], 16), self.parse_key_val_dict(
+ context["stop_key_val_text"]))
def add_QSaveRegisterState_packets(self, thread_id):
if thread_id:
# Use the thread suffix form.
- request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(thread_id)
+ request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(
+ thread_id)
else:
request = "read packet: $QSaveRegisterState#00"
-
- self.test_sequence.add_log_lines([
- request,
- {"direction":"send", "regex":r"^\$(E?.*)#[0-9a-fA-F]{2}$", "capture":{1:"save_response" } },
- ], True)
+
+ self.test_sequence.add_log_lines([request,
+ {"direction": "send",
+ "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$",
+ "capture": {1: "save_response"}},
+ ],
+ True)
def parse_QSaveRegisterState_response(self, context):
self.assertIsNotNone(context)
@@ -1032,16 +1188,19 @@ class GdbRemoteTestCaseBase(TestBase):
def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
if thread_id:
# Use the thread suffix form.
- request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(save_id, thread_id)
+ request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(
+ save_id, thread_id)
else:
- request = "read packet: $QRestoreRegisterState:{}#00".format(save_id)
+ request = "read packet: $QRestoreRegisterState:{}#00".format(
+ save_id)
self.test_sequence.add_log_lines([
request,
"send packet: $OK#00"
- ], True)
+ ], True)
- def flip_all_bits_in_each_register_value(self, reg_infos, endian, thread_id=None):
+ def flip_all_bits_in_each_register_value(
+ self, reg_infos, endian, thread_id=None):
self.assertIsNotNone(reg_infos)
successful_writes = 0
@@ -1049,16 +1208,18 @@ class GdbRemoteTestCaseBase(TestBase):
for reg_info in reg_infos:
# Use the lldb register index added to the reg info. We're not necessarily
- # working off a full set of register infos, so an inferred register index could be wrong.
+ # working off a full set of register infos, so an inferred register
+ # index could be wrong.
reg_index = reg_info["lldb_register_index"]
self.assertIsNotNone(reg_index)
- reg_byte_size = int(reg_info["bitsize"])/8
+ reg_byte_size = int(reg_info["bitsize"]) / 8
self.assertTrue(reg_byte_size > 0)
# Handle thread suffix.
if thread_id:
- p_request = "read packet: $p{:x};thread:{:x}#00".format(reg_index, thread_id)
+ p_request = "read packet: $p{:x};thread:{:x}#00".format(
+ reg_index, thread_id)
else:
p_request = "read packet: $p{:x}#00".format(reg_index)
@@ -1066,15 +1227,16 @@ class GdbRemoteTestCaseBase(TestBase):
self.reset_test_sequence()
self.test_sequence.add_log_lines([
p_request,
- { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
- ], True)
+ {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
+ ], True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Verify the response length.
p_response = context.get("p_response")
self.assertIsNotNone(p_response)
- initial_reg_value = unpack_register_hex_unsigned(endian, p_response)
+ initial_reg_value = unpack_register_hex_unsigned(
+ endian, p_response)
# Flip the value by xoring with all 1s
all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) / 8)
@@ -1083,16 +1245,22 @@ class GdbRemoteTestCaseBase(TestBase):
# Handle thread suffix for P.
if thread_id:
- P_request = "read packet: $P{:x}={};thread:{:x}#00".format(reg_index, pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
+ P_request = "read packet: $P{:x}={};thread:{:x}#00".format(
+ reg_index, pack_register_hex(
+ endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
else:
- P_request = "read packet: $P{:x}={}#00".format(reg_index, pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size))
+ P_request = "read packet: $P{:x}={}#00".format(
+ reg_index, pack_register_hex(
+ endian, flipped_bits_int, byte_size=reg_byte_size))
# Write the flipped value to the register.
self.reset_test_sequence()
- self.test_sequence.add_log_lines([
- P_request,
- { "direction":"send", "regex":r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", "capture":{1:"P_response"} },
- ], True)
+ self.test_sequence.add_log_lines([P_request,
+ {"direction": "send",
+ "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}",
+ "capture": {1: "P_response"}},
+ ],
+ True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
@@ -1107,25 +1275,27 @@ class GdbRemoteTestCaseBase(TestBase):
failed_writes += 1
# print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
- # Read back the register value, ensure it matches the flipped value.
+ # Read back the register value, ensure it matches the flipped
+ # value.
if P_response == "OK":
self.reset_test_sequence()
self.test_sequence.add_log_lines([
p_request,
- { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
- ], True)
+ {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
+ ], True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
verify_p_response_raw = context.get("p_response")
self.assertIsNotNone(verify_p_response_raw)
- verify_bits = unpack_register_hex_unsigned(endian, verify_p_response_raw)
+ verify_bits = unpack_register_hex_unsigned(
+ endian, verify_p_response_raw)
if verify_bits != flipped_bits_int:
# Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts.
# print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
successful_writes -= 1
- failed_writes +=1
+ failed_writes += 1
return (successful_writes, failed_writes)
@@ -1136,7 +1306,8 @@ class GdbRemoteTestCaseBase(TestBase):
return False
if reg_info["set"] != "General Purpose Registers":
return False
- if ("container-regs" in reg_info) and (len(reg_info["container-regs"]) > 0):
+ if ("container-regs" in reg_info) and (
+ len(reg_info["container-regs"]) > 0):
# Don't try to bit flip registers contained in another register.
return False
if re.match("^.s$", reg_info["name"]):
@@ -1154,13 +1325,15 @@ class GdbRemoteTestCaseBase(TestBase):
values = {}
for reg_info in reg_infos:
- # We append a register index when load reg infos so we can work with subsets.
+ # We append a register index when load reg infos so we can work
+ # with subsets.
reg_index = reg_info.get("lldb_register_index")
self.assertIsNotNone(reg_index)
# Handle thread suffix.
if thread_id:
- p_request = "read packet: $p{:x};thread:{:x}#00".format(reg_index, thread_id)
+ p_request = "read packet: $p{:x};thread:{:x}#00".format(
+ reg_index, thread_id)
else:
p_request = "read packet: $p{:x}#00".format(reg_index)
@@ -1168,8 +1341,8 @@ class GdbRemoteTestCaseBase(TestBase):
self.reset_test_sequence()
self.test_sequence.add_log_lines([
p_request,
- { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
- ], True)
+ {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
+ ], True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
@@ -1178,58 +1351,75 @@ class GdbRemoteTestCaseBase(TestBase):
self.assertIsNotNone(p_response)
self.assertTrue(len(p_response) > 0)
self.assertFalse(p_response[0] == "E")
-
- values[reg_index] = unpack_register_hex_unsigned(endian, p_response)
-
+
+ values[reg_index] = unpack_register_hex_unsigned(
+ endian, p_response)
+
return values
def add_vCont_query_packets(self):
- self.test_sequence.add_log_lines([
- "read packet: $vCont?#49",
- {"direction":"send", "regex":r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", "capture":{2:"vCont_query_response" } },
- ], True)
+ self.test_sequence.add_log_lines(["read packet: $vCont?#49",
+ {"direction": "send",
+ "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$",
+ "capture": {2: "vCont_query_response"}},
+ ],
+ True)
def parse_vCont_query_response(self, context):
self.assertIsNotNone(context)
vCont_query_response = context.get("vCont_query_response")
- # Handle case of no vCont support at all - in which case the capture group will be none or zero length.
+ # Handle case of no vCont support at all - in which case the capture
+ # group will be none or zero length.
if not vCont_query_response or len(vCont_query_response) == 0:
return {}
- return {key:1 for key in vCont_query_response.split(";") if key and len(key) > 0}
-
- def count_single_steps_until_true(self, thread_id, predicate, args, max_step_count=100, use_Hc_packet=True, step_instruction="s"):
+ return {key: 1 for key in vCont_query_response.split(
+ ";") if key and len(key) > 0}
+
+ def count_single_steps_until_true(
+ self,
+ thread_id,
+ predicate,
+ args,
+ max_step_count=100,
+ use_Hc_packet=True,
+ step_instruction="s"):
"""Used by single step test that appears in a few different contexts."""
single_step_count = 0
while single_step_count < max_step_count:
self.assertIsNotNone(thread_id)
- # Build the packet for the single step instruction. We replace {thread}, if present, with the thread_id.
- step_packet = "read packet: ${}#00".format(re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
+ # Build the packet for the single step instruction. We replace
+ # {thread}, if present, with the thread_id.
+ step_packet = "read packet: ${}#00".format(
+ re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
# print("\nstep_packet created: {}\n".format(step_packet))
# Single step.
self.reset_test_sequence()
if use_Hc_packet:
self.test_sequence.add_log_lines(
- [# Set the continue thread.
- "read packet: $Hc{0:x}#00".format(thread_id),
- "send packet: $OK#00",
- ], True)
+ [ # Set the continue thread.
+ "read packet: $Hc{0:x}#00".format(thread_id),
+ "send packet: $OK#00",
+ ], True)
self.test_sequence.add_log_lines([
- # Single step.
- step_packet,
- # "read packet: $vCont;s:{0:x}#00".format(thread_id),
- # Expect a breakpoint stop report.
- {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
- ], True)
+ # Single step.
+ step_packet,
+ # "read packet: $vCont;s:{0:x}#00".format(thread_id),
+ # Expect a breakpoint stop report.
+ {"direction": "send",
+ "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
+ "capture": {1: "stop_signo",
+ 2: "stop_thread_id"}},
+ ], True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
self.assertIsNotNone(context.get("stop_signo"))
self.assertEqual(int(context.get("stop_signo"), 16),
- lldbutil.get_signal_number('SIGTRAP'))
+ lldbutil.get_signal_number('SIGTRAP'))
single_step_count += 1
@@ -1251,9 +1441,9 @@ class GdbRemoteTestCaseBase(TestBase):
self.reset_test_sequence()
self.test_sequence.add_log_lines(
["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
- {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"g_c1_contents"} },
+ {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}},
"read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
- {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"g_c2_contents"} }],
+ {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}],
True)
# Run the packet stream.
@@ -1264,26 +1454,34 @@ class GdbRemoteTestCaseBase(TestBase):
self.assertIsNotNone(context.get("g_c1_contents"))
self.assertIsNotNone(context.get("g_c2_contents"))
- return (context.get("g_c1_contents").decode("hex") == expected_g_c1) and (context.get("g_c2_contents").decode("hex") == expected_g_c2)
+ return (context.get("g_c1_contents").decode("hex") == expected_g_c1) and (
+ context.get("g_c2_contents").decode("hex") == expected_g_c2)
- def single_step_only_steps_one_instruction(self, use_Hc_packet=True, step_instruction="s"):
+ def single_step_only_steps_one_instruction(
+ self, use_Hc_packet=True, step_instruction="s"):
"""Used by single step test that appears in a few different contexts."""
# Start up the inferior.
procs = self.prep_debug_monitor_and_inferior(
- inferior_args=["get-code-address-hex:swap_chars", "get-data-address-hex:g_c1", "get-data-address-hex:g_c2", "sleep:1", "call-function:swap_chars", "sleep:5"])
+ inferior_args=[
+ "get-code-address-hex:swap_chars",
+ "get-data-address-hex:g_c1",
+ "get-data-address-hex:g_c2",
+ "sleep:1",
+ "call-function:swap_chars",
+ "sleep:5"])
# Run the process
self.test_sequence.add_log_lines(
- [# Start running after initial stop.
- "read packet: $c#63",
- # Match output line that prints the memory address of the function call entry point.
- # Note we require launch-only testing so we can get inferior otuput.
- { "type":"output_match", "regex":r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
- "capture":{ 1:"function_address", 2:"g_c1_address", 3:"g_c2_address"} },
- # Now stop the inferior.
- "read packet: {}".format(chr(3)),
- # And wait for the stop notification.
- {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} }],
+ [ # Start running after initial stop.
+ "read packet: $c#63",
+ # Match output line that prints the memory address of the function call entry point.
+ # Note we require launch-only testing so we can get inferior otuput.
+ {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
+ "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}},
+ # Now stop the inferior.
+ "read packet: {}".format(chr(3)),
+ # And wait for the stop notification.
+ {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
True)
# Run the packet stream.
@@ -1312,13 +1510,17 @@ class GdbRemoteTestCaseBase(TestBase):
else:
BREAKPOINT_KIND = 1
self.reset_test_sequence()
- self.add_set_breakpoint_packets(function_address, do_continue=True, breakpoint_kind=BREAKPOINT_KIND)
+ self.add_set_breakpoint_packets(
+ function_address,
+ do_continue=True,
+ breakpoint_kind=BREAKPOINT_KIND)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Remove the breakpoint.
self.reset_test_sequence()
- self.add_remove_breakpoint_packets(function_address, breakpoint_kind=BREAKPOINT_KIND)
+ self.add_remove_breakpoint_packets(
+ function_address, breakpoint_kind=BREAKPOINT_KIND)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
@@ -1331,42 +1533,83 @@ class GdbRemoteTestCaseBase(TestBase):
self.assertTrue(self.g_c1_c2_contents_are(args))
- # Verify we take only a small number of steps to hit the first state. Might need to work through function entry prologue code.
+ # Verify we take only a small number of steps to hit the first state.
+ # Might need to work through function entry prologue code.
args["expected_g_c1"] = "1"
args["expected_g_c2"] = "1"
- (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=25, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
+ (state_reached,
+ step_count) = self.count_single_steps_until_true(main_thread_id,
+ self.g_c1_c2_contents_are,
+ args,
+ max_step_count=25,
+ use_Hc_packet=use_Hc_packet,
+ step_instruction=step_instruction)
self.assertTrue(state_reached)
# Verify we hit the next state.
args["expected_g_c1"] = "1"
args["expected_g_c2"] = "0"
- (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
+ (state_reached,
+ step_count) = self.count_single_steps_until_true(main_thread_id,
+ self.g_c1_c2_contents_are,
+ args,
+ max_step_count=5,
+ use_Hc_packet=use_Hc_packet,
+ step_instruction=step_instruction)
self.assertTrue(state_reached)
expected_step_count = 1
arch = self.getArchitecture()
- #MIPS required "3" (ADDIU, SB, LD) machine instructions for updation of variable value
- if re.match("mips",arch):
- expected_step_count = 3
- #S390X requires "2" (LARL, MVI) machine instructions for updation of variable value
- if re.match("s390x",arch):
- expected_step_count = 2
+ # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation
+ # of variable value
+ if re.match("mips", arch):
+ expected_step_count = 3
+ # S390X requires "2" (LARL, MVI) machine instructions for updation of
+ # variable value
+ if re.match("s390x", arch):
+ expected_step_count = 2
self.assertEqual(step_count, expected_step_count)
# Verify we hit the next state.
args["expected_g_c1"] = "0"
args["expected_g_c2"] = "0"
- (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
+ (state_reached,
+ step_count) = self.count_single_steps_until_true(main_thread_id,
+ self.g_c1_c2_contents_are,
+ args,
+ max_step_count=5,
+ use_Hc_packet=use_Hc_packet,
+ step_instruction=step_instruction)
self.assertTrue(state_reached)
self.assertEqual(step_count, expected_step_count)
# Verify we hit the next state.
args["expected_g_c1"] = "0"
args["expected_g_c2"] = "1"
- (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
+ (state_reached,
+ step_count) = self.count_single_steps_until_true(main_thread_id,
+ self.g_c1_c2_contents_are,
+ args,
+ max_step_count=5,
+ use_Hc_packet=use_Hc_packet,
+ step_instruction=step_instruction)
self.assertTrue(state_reached)
self.assertEqual(step_count, expected_step_count)
def maybe_strict_output_regex(self, regex):
- return '.*'+regex+'.*' if lldbplatformutil.hasChattyStderr(self) else '^'+regex+'$'
-
+ return '.*' + regex + \
+ '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$'
+
+ def install_and_create_launch_args(self):
+ exe_path = os.path.abspath('a.out')
+ if not lldb.remote_platform:
+ return [exe_path]
+ remote_path = lldbutil.append_to_process_working_directory(
+ os.path.basename(exe_path))
+ remote_file_spec = lldb.SBFileSpec(remote_path, False)
+ err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True),
+ remote_file_spec)
+ if err.Fail():
+ raise Exception("remote_platform.Install('%s', '%s') failed: %s" %
+ (exe_path, remote_path, err))
+ return [remote_path]