diff options
Diffstat (limited to 'packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py')
-rw-r--r-- | packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py | 685 |
1 files changed, 464 insertions, 221 deletions
diff --git a/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py b/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py index d63ddbe39998..ef6a0a21aaeb 100644 --- a/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py +++ b/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py @@ -5,7 +5,6 @@ Base class for gdb-remote test cases. from __future__ import print_function - import errno import os import os.path @@ -24,9 +23,11 @@ from lldbsuite.test.lldbtest import * from lldbgdbserverutils import * import logging + class _ConnectionRefused(IOError): pass + class GdbRemoteTestCaseBase(TestBase): NO_DEBUG_INFO_TESTCASE = True @@ -35,29 +36,34 @@ class GdbRemoteTestCaseBase(TestBase): _GDBREMOTE_KILL_PACKET = "$k#6b" - # Start the inferior separately, attach to the inferior on the stub command line. + # Start the inferior separately, attach to the inferior on the stub + # command line. _STARTUP_ATTACH = "attach" - # Start the inferior separately, start the stub without attaching, allow the test to attach to the inferior however it wants (e.g. $vAttach;pid). + # Start the inferior separately, start the stub without attaching, allow + # the test to attach to the inferior however it wants (e.g. $vAttach;pid). _STARTUP_ATTACH_MANUALLY = "attach_manually" - # Start the stub, and launch the inferior with an $A packet via the initial packet stream. + # Start the stub, and launch the inferior with an $A packet via the + # initial packet stream. _STARTUP_LAUNCH = "launch" - # GDB Signal numbers that are not target-specific used for common exceptions - TARGET_EXC_BAD_ACCESS = 0x91 + # GDB Signal numbers that are not target-specific used for common + # exceptions + TARGET_EXC_BAD_ACCESS = 0x91 TARGET_EXC_BAD_INSTRUCTION = 0x92 - TARGET_EXC_ARITHMETIC = 0x93 - TARGET_EXC_EMULATION = 0x94 - TARGET_EXC_SOFTWARE = 0x95 - TARGET_EXC_BREAKPOINT = 0x96 + TARGET_EXC_ARITHMETIC = 0x93 + TARGET_EXC_EMULATION = 0x94 + TARGET_EXC_SOFTWARE = 0x95 + TARGET_EXC_BREAKPOINT = 0x96 _verbose_log_handler = None - _log_formatter = logging.Formatter(fmt='%(asctime)-15s %(levelname)-8s %(message)s') + _log_formatter = logging.Formatter( + fmt='%(asctime)-15s %(levelname)-8s %(message)s') def setUpBaseLogging(self): self.logger = logging.getLogger(__name__) if len(self.logger.handlers) > 0: - return # We have set up this handler already + return # We have set up this handler already self.logger.propagate = False self.logger.setLevel(logging.DEBUG) @@ -68,11 +74,11 @@ class GdbRemoteTestCaseBase(TestBase): handler.setFormatter(self._log_formatter) self.logger.addHandler(handler) - def isVerboseLoggingRequested(self): # We will report our detailed logs if the user requested that the "gdb-remote" channel is # logged. - return any(("gdb-remote" in channel) for channel in lldbtest_config.channels) + return any(("gdb-remote" in channel) + for channel in lldbtest_config.channels) def setUp(self): TestBase.setUp(self) @@ -83,7 +89,8 @@ class GdbRemoteTestCaseBase(TestBase): if self.isVerboseLoggingRequested(): # If requested, full logs go to a log file - self._verbose_log_handler = logging.FileHandler(self.log_basename + "-host.log") + self._verbose_log_handler = logging.FileHandler( + self.log_basename + "-host.log") self._verbose_log_handler.setFormatter(self._log_formatter) self._verbose_log_handler.setLevel(logging.DEBUG) self.logger.addHandler(self._verbose_log_handler) @@ -100,7 +107,8 @@ class GdbRemoteTestCaseBase(TestBase): url_pattern = '(.+)://\[?(.+?)\]?/.*' else: url_pattern = '(.+)://(.+):\d+' - scheme, host = re.match(url_pattern, configuration.lldb_platform_url).groups() + scheme, host = re.match( + url_pattern, configuration.lldb_platform_url).groups() if configuration.lldb_platform_name == 'remote-android' and host != 'localhost': self.stub_device = host self.stub_hostname = 'localhost' @@ -122,21 +130,24 @@ class GdbRemoteTestCaseBase(TestBase): def setUpServerLogging(self, is_llgs): if len(lldbtest_config.channels) == 0: - return # No logging requested + return # No logging requested if lldb.remote_platform: - log_file = lldbutil.join_remote_paths(lldb.remote_platform.GetWorkingDirectory(), "server.log") + log_file = lldbutil.join_remote_paths( + lldb.remote_platform.GetWorkingDirectory(), "server.log") else: log_file = self.getLocalServerLogFile() if is_llgs: self.debug_monitor_extra_args.append("--log-file=" + log_file) - self.debug_monitor_extra_args.append("--log-channels={}".format(":".join(lldbtest_config.channels))) + self.debug_monitor_extra_args.append( + "--log-channels={}".format(":".join(lldbtest_config.channels))) else: - self.debug_monitor_extra_args = ["--log-file=" + self.log_file, "--log-flags=0x800000"] + self.debug_monitor_extra_args = [ + "--log-file=" + log_file, "--log-flags=0x800000"] def get_next_port(self): - return 12000 + random.randint(0,3999) + return 12000 + random.randint(0, 3999) def reset_test_sequence(self): self.test_sequence = GdbRemoteTestSequence(self.logger) @@ -149,7 +160,8 @@ class GdbRemoteTestCaseBase(TestBase): # Create the named pipe. os.mkfifo(named_pipe_path) - # Open the read side of the pipe in non-blocking mode. This will return right away, ready or not. + # Open the read side of the pipe in non-blocking mode. This will + # return right away, ready or not. named_pipe_fd = os.open(named_pipe_path, os.O_RDONLY | os.O_NONBLOCK) # Create the file for the named pipe. Note this will follow semantics of @@ -177,7 +189,9 @@ class GdbRemoteTestCaseBase(TestBase): try: os.rmdir(temp_dir) except: - print("failed to delete temp dir: {}, directory contents: '{}'".format(temp_dir, os.listdir(temp_dir))) + print( + "failed to delete temp dir: {}, directory contents: '{}'".format( + temp_dir, os.listdir(temp_dir))) None # Add the shutdown hook to clean up the named pipe. @@ -190,14 +204,23 @@ class GdbRemoteTestCaseBase(TestBase): def get_stub_port_from_named_socket(self, read_timeout_seconds=5): # Wait for something to read with a max timeout. - (ready_readers, _, _) = select.select([self.named_pipe_fd], [], [], read_timeout_seconds) - self.assertIsNotNone(ready_readers, "write side of pipe has not written anything - stub isn't writing to pipe.") - self.assertNotEqual(len(ready_readers), 0, "write side of pipe has not written anything - stub isn't writing to pipe.") + (ready_readers, _, _) = select.select( + [self.named_pipe_fd], [], [], read_timeout_seconds) + self.assertIsNotNone( + ready_readers, + "write side of pipe has not written anything - stub isn't writing to pipe.") + self.assertNotEqual( + len(ready_readers), + 0, + "write side of pipe has not written anything - stub isn't writing to pipe.") # Read the port from the named pipe. stub_port_raw = self.named_pipe.read() self.assertIsNotNone(stub_port_raw) - self.assertNotEqual(len(stub_port_raw), 0, "no content to read on pipe") + self.assertNotEqual( + len(stub_port_raw), + 0, + "no content to read on pipe") # Trim null byte, convert to int. stub_port_raw = stub_port_raw[:-1] @@ -212,15 +235,24 @@ class GdbRemoteTestCaseBase(TestBase): use_named_pipe = False # Grab the ppid from /proc/[shell pid]/stat - err, retcode, shell_stat = self.run_platform_command("cat /proc/$$/stat") - self.assertTrue(err.Success() and retcode == 0, - "Failed to read file /proc/$$/stat: %s, retcode: %d" % (err.GetCString(), retcode)) + err, retcode, shell_stat = self.run_platform_command( + "cat /proc/$$/stat") + self.assertTrue( + err.Success() and retcode == 0, + "Failed to read file /proc/$$/stat: %s, retcode: %d" % + (err.GetCString(), + retcode)) # [pid] ([executable]) [state] [*ppid*] pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1) - err, retcode, ls_output = self.run_platform_command("ls -l /proc/%s/exe" % pid) - self.assertTrue(err.Success() and retcode == 0, - "Failed to read file /proc/%s/exe: %s, retcode: %d" % (pid, err.GetCString(), retcode)) + err, retcode, ls_output = self.run_platform_command( + "ls -l /proc/%s/exe" % pid) + self.assertTrue( + err.Success() and retcode == 0, + "Failed to read file /proc/%s/exe: %s, retcode: %d" % + (pid, + err.GetCString(), + retcode)) exe = ls_output.split()[-1] # If the binary has been deleted, the link name has " (deleted)" appended. @@ -235,7 +267,8 @@ class GdbRemoteTestCaseBase(TestBase): self.setUpServerLogging(is_llgs=True) if use_named_pipe: - (self.named_pipe_path, self.named_pipe, self.named_pipe_fd) = self.create_named_pipe() + (self.named_pipe_path, self.named_pipe, + self.named_pipe_fd) = self.create_named_pipe() def init_debugserver_test(self, use_named_pipe=True): self.debug_monitor_exe = get_debugserver_exe() @@ -243,17 +276,19 @@ class GdbRemoteTestCaseBase(TestBase): self.skipTest("debugserver exe not found") self.setUpServerLogging(is_llgs=False) if use_named_pipe: - (self.named_pipe_path, self.named_pipe, self.named_pipe_fd) = self.create_named_pipe() + (self.named_pipe_path, self.named_pipe, + self.named_pipe_fd) = self.create_named_pipe() # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification # when the process truly dies. self.stub_sends_two_stop_notifications_on_kill = True def forward_adb_port(self, source, target, direction, device): - adb = [ 'adb' ] + ([ '-s', device ] if device else []) + [ direction ] + adb = ['adb'] + (['-s', device] if device else []) + [direction] + def remove_port_forward(): - subprocess.call(adb + [ "--remove", "tcp:%d" % source]) + subprocess.call(adb + ["--remove", "tcp:%d" % source]) - subprocess.call(adb + [ "tcp:%d" % source, "tcp:%d" % target]) + subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target]) self.addTearDownHook(remove_port_forward) def _verify_socket(self, sock): @@ -265,12 +300,12 @@ class GdbRemoteTestCaseBase(TestBase): # connection again. triple = self.dbg.GetSelectedPlatform().GetTriple() if not re.match(".*-.*-.*-android", triple): - return # Not android. + return # Not android. can_read, _, _ = select.select([sock], [], [], 0.1) if sock not in can_read: - return # Data is not available, but the connection is alive. + return # Data is not available, but the connection is alive. if len(sock.recv(1, socket.MSG_PEEK)) == 0: - raise _ConnectionRefused() # Got EOF, connection dropped. + raise _ConnectionRefused() # Got EOF, connection dropped. def create_socket(self): sock = socket.socket() @@ -278,9 +313,16 @@ class GdbRemoteTestCaseBase(TestBase): triple = self.dbg.GetSelectedPlatform().GetTriple() if re.match(".*-.*-.*-android", triple): - self.forward_adb_port(self.port, self.port, "forward", self.stub_device) - - logger.info("Connecting to debug monitor on %s:%d", self.stub_hostname, self.port) + self.forward_adb_port( + self.port, + self.port, + "forward", + self.stub_device) + + logger.info( + "Connecting to debug monitor on %s:%d", + self.stub_hostname, + self.port) connect_info = (self.stub_hostname, self.port) try: sock.connect(connect_info) @@ -295,12 +337,16 @@ class GdbRemoteTestCaseBase(TestBase): # send the kill packet so lldb-server shuts down gracefully sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET) except: - logger.warning("failed to send kill packet to debug monitor: {}; ignoring".format(sys.exc_info()[0])) + logger.warning( + "failed to send kill packet to debug monitor: {}; ignoring".format( + sys.exc_info()[0])) try: sock.close() except: - logger.warning("failed to close socket to debug monitor: {}; ignoring".format(sys.exc_info()[0])) + logger.warning( + "failed to close socket to debug monitor: {}; ignoring".format( + sys.exc_info()[0])) self.addTearDownHook(shutdown_socket) @@ -319,9 +365,11 @@ class GdbRemoteTestCaseBase(TestBase): def get_debug_monitor_command_line_args(self, attach_pid=None): if lldb.remote_platform: - commandline_args = self.debug_monitor_extra_args + ["*:{}".format(self.port)] + commandline_args = self.debug_monitor_extra_args + \ + ["*:{}".format(self.port)] else: - commandline_args = self.debug_monitor_extra_args + ["localhost:{}".format(self.port)] + commandline_args = self.debug_monitor_extra_args + \ + ["localhost:{}".format(self.port)] if attach_pid: commandline_args += ["--attach=%d" % attach_pid] @@ -331,14 +379,19 @@ class GdbRemoteTestCaseBase(TestBase): def launch_debug_monitor(self, attach_pid=None, logfile=None): # Create the command line. - commandline_args = self.get_debug_monitor_command_line_args(attach_pid=attach_pid) + commandline_args = self.get_debug_monitor_command_line_args( + attach_pid=attach_pid) # Start the server. - server = self.spawnSubprocess(self.debug_monitor_exe, commandline_args, install_remote=False) + server = self.spawnSubprocess( + self.debug_monitor_exe, + commandline_args, + install_remote=False) self.addTearDownHook(self.cleanupSubprocesses) self.assertIsNotNone(server) - # If we're receiving the stub's listening port from the named pipe, do that here. + # If we're receiving the stub's listening port from the named pipe, do + # that here. if self.named_pipe: self.port = self.get_stub_port_from_named_socket() @@ -354,7 +407,9 @@ class GdbRemoteTestCaseBase(TestBase): try: server.terminate() except: - logger.warning("failed to terminate server for debug monitor: {}; ignoring".format(sys.exc_info()[0])) + logger.warning( + "failed to terminate server for debug monitor: {}; ignoring".format( + sys.exc_info()[0])) self.addTearDownHook(shutdown_debug_monitor) # Schedule debug monitor to be shut down during teardown. @@ -374,11 +429,14 @@ class GdbRemoteTestCaseBase(TestBase): # Schedule debug monitor to be shut down during teardown. logger = self.logger + def shutdown_debug_monitor(): try: server.terminate() except: - logger.warning("failed to terminate server for debug monitor: {}; ignoring".format(sys.exc_info()[0])) + logger.warning( + "failed to terminate server for debug monitor: {}; ignoring".format( + sys.exc_info()[0])) self.addTearDownHook(shutdown_debug_monitor) connect_attemps = 0 @@ -387,7 +445,7 @@ class GdbRemoteTestCaseBase(TestBase): while connect_attemps < MAX_CONNECT_ATTEMPTS: # Create a socket to talk to the server try: - logger.info("Connect attempt %d", connect_attemps+1) + logger.info("Connect attempt %d", connect_attemps + 1) self.sock = self.create_socket() return server except _ConnectionRefused as serr: @@ -400,18 +458,27 @@ class GdbRemoteTestCaseBase(TestBase): server.terminate() # Increment attempts. - print("connect to debug monitor on port %d failed, attempt #%d of %d" % (self.port, attempts + 1, MAX_ATTEMPTS)) + print( + "connect to debug monitor on port %d failed, attempt #%d of %d" % + (self.port, attempts + 1, MAX_ATTEMPTS)) attempts += 1 - # And wait a random length of time before next attempt, to avoid collisions. - time.sleep(random.randint(1,5)) - + # And wait a random length of time before next attempt, to avoid + # collisions. + time.sleep(random.randint(1, 5)) + # Now grab a new port number. self.port = self.get_next_port() - raise Exception("failed to create a socket to the launched debug monitor after %d tries" % attempts) + raise Exception( + "failed to create a socket to the launched debug monitor after %d tries" % + attempts) - def launch_process_for_attach(self, inferior_args=None, sleep_seconds=3, exe_path=None): + def launch_process_for_attach( + self, + inferior_args=None, + sleep_seconds=3, + exe_path=None): # We're going to start a child process that the debug monitor stub can later attach to. # This process needs to be started so that it just hangs around for a while. We'll # have it sleep. @@ -425,15 +492,22 @@ class GdbRemoteTestCaseBase(TestBase): args.append("sleep:%d" % sleep_seconds) inferior = self.spawnSubprocess(exe_path, args) + def shutdown_process_for_attach(): try: inferior.terminate() except: - logger.warning("failed to terminate inferior process for attach: {}; ignoring".format(sys.exc_info()[0])) + logger.warning( + "failed to terminate inferior process for attach: {}; ignoring".format( + sys.exc_info()[0])) self.addTearDownHook(shutdown_process_for_attach) return inferior - def prep_debug_monitor_and_inferior(self, inferior_args=None, inferior_sleep_seconds=3, inferior_exe_path=None): + def prep_debug_monitor_and_inferior( + self, + inferior_args=None, + inferior_sleep_seconds=3, + inferior_exe_path=None): """Prep the debug monitor, the inferior, and the expected packet stream. Handle the separate cases of using the debug monitor in attach-to-inferior mode @@ -458,11 +532,15 @@ class GdbRemoteTestCaseBase(TestBase): if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY: # Launch the process that we'll use as the inferior. - inferior = self.launch_process_for_attach(inferior_args=inferior_args, sleep_seconds=inferior_sleep_seconds, exe_path=inferior_exe_path) + inferior = self.launch_process_for_attach( + inferior_args=inferior_args, + sleep_seconds=inferior_sleep_seconds, + exe_path=inferior_exe_path) self.assertIsNotNone(inferior) self.assertTrue(inferior.pid > 0) if self._inferior_startup == self._STARTUP_ATTACH: - # In this case, we want the stub to attach via the command line, so set the command line attach pid here. + # In this case, we want the stub to attach via the command + # line, so set the command line attach pid here. attach_pid = inferior.pid if self._inferior_startup == self._STARTUP_LAUNCH: @@ -471,11 +549,15 @@ class GdbRemoteTestCaseBase(TestBase): inferior_exe_path = os.path.abspath("a.out") if lldb.remote_platform: - remote_path = lldbutil.append_to_process_working_directory(os.path.basename(inferior_exe_path)) + remote_path = lldbutil.append_to_process_working_directory( + os.path.basename(inferior_exe_path)) remote_file_spec = lldb.SBFileSpec(remote_path, False) - err = lldb.remote_platform.Install(lldb.SBFileSpec(inferior_exe_path, True), remote_file_spec) + err = lldb.remote_platform.Install(lldb.SBFileSpec( + inferior_exe_path, True), remote_file_spec) if err.Fail(): - raise Exception("remote_platform.Install('%s', '%s') failed: %s" % (inferior_exe_path, remote_path, err)) + raise Exception( + "remote_platform.Install('%s', '%s') failed: %s" % + (inferior_exe_path, remote_path, err)) inferior_exe_path = remote_path launch_args = [inferior_exe_path] @@ -491,13 +573,18 @@ class GdbRemoteTestCaseBase(TestBase): if self._inferior_startup == self._STARTUP_LAUNCH: self.add_verified_launch_packets(launch_args) - return {"inferior":inferior, "server":server} + return {"inferior": inferior, "server": server} - def expect_socket_recv(self, sock, expected_content_regex, timeout_seconds): + def expect_socket_recv( + self, + sock, + expected_content_regex, + timeout_seconds): response = "" timeout_time = time.time() + timeout_seconds - while not expected_content_regex.match(response) and time.time() < timeout_time: + while not expected_content_regex.match( + response) and time.time() < timeout_time: can_read, _, _ = select.select([sock], [], [], timeout_seconds) if can_read and sock in can_read: recv_bytes = sock.recv(4096) @@ -514,7 +601,8 @@ class GdbRemoteTestCaseBase(TestBase): _, can_write, _ = select.select([], [sock], [], timeout_seconds) if can_write and sock in can_write: written_byte_count = sock.send(request_bytes_remaining) - request_bytes_remaining = request_bytes_remaining[written_byte_count:] + request_bytes_remaining = request_bytes_remaining[ + written_byte_count:] self.assertEqual(len(request_bytes_remaining), 0) def do_handshake(self, stub_socket, timeout_seconds=5): @@ -527,7 +615,8 @@ class GdbRemoteTestCaseBase(TestBase): self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST)) # Receive the ack and "OK" - self.expect_socket_recv(stub_socket, re.compile(r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds) + self.expect_socket_recv(stub_socket, re.compile( + r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds) # Send the final ack. self.expect_socket_send(stub_socket, "+", timeout_seconds) @@ -553,12 +642,12 @@ class GdbRemoteTestCaseBase(TestBase): self.test_sequence.add_log_lines( ["read packet: $QThreadSuffixSupported#e4", "send packet: $OK#00", - ], True) + ], True) def add_process_info_collection_packets(self): self.test_sequence.add_log_lines( ["read packet: $qProcessInfo#dc", - { "direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"process_info_raw"} }], + {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}], True) _KNOWN_PROCESS_INFO_KEYS = [ @@ -574,8 +663,9 @@ class GdbRemoteTestCaseBase(TestBase): "triple", "vendor", "endian", + "elf_abi", "ptrsize" - ] + ] def parse_process_info_response(self, context): # Ensure we have a process info response. @@ -584,7 +674,9 @@ class GdbRemoteTestCaseBase(TestBase): self.assertIsNotNone(process_info_raw) # Pull out key:value; pairs. - process_info_dict = { match.group(1):match.group(2) for match in re.finditer(r"([^:]+):([^;]+);", process_info_raw) } + process_info_dict = { + match.group(1): match.group(2) for match in re.finditer( + r"([^:]+):([^;]+);", process_info_raw)} # Validate keys are known. for (key, val) in list(process_info_dict.items()): @@ -595,9 +687,9 @@ class GdbRemoteTestCaseBase(TestBase): def add_register_info_collection_packets(self): self.test_sequence.add_log_lines( - [ { "type":"multi_response", "query":"qRegisterInfo", "append_iteration_suffix":True, - "end_regex":re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"), - "save_key":"reg_info_responses" } ], + [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True, + "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"), + "save_key": "reg_info_responses"}], True) def parse_register_info_packets(self, context): @@ -606,13 +698,19 @@ class GdbRemoteTestCaseBase(TestBase): self.assertIsNotNone(reg_info_responses) # Parse register infos. - return [parse_reg_info_response(reg_info_response) for reg_info_response in reg_info_responses] + return [parse_reg_info_response(reg_info_response) + for reg_info_response in reg_info_responses] def expect_gdbremote_sequence(self, timeout_seconds=None): if not timeout_seconds: timeout_seconds = self._TIMEOUT_SECONDS - return expect_lldb_gdbserver_replay(self, self.sock, self.test_sequence, - self._pump_queues, timeout_seconds, self.logger) + return expect_lldb_gdbserver_replay( + self, + self.sock, + self.test_sequence, + self._pump_queues, + timeout_seconds, + self.logger) _KNOWN_REGINFO_KEYS = [ "name", @@ -627,7 +725,9 @@ class GdbRemoteTestCaseBase(TestBase): "dwarf", "generic", "container-regs", - "invalidate-regs" + "invalidate-regs", + "dynamic_size_dwarf_expr_bytes", + "dynamic_size_dwarf_len" ] def assert_valid_reg_info(self, reg_info): @@ -667,7 +767,7 @@ class GdbRemoteTestCaseBase(TestBase): def add_query_memory_region_packets(self, address): self.test_sequence.add_log_lines( ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address), - {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"memory_region_response"} }], + {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}], True) def parse_key_val_dict(self, key_val_text, allow_dupes=True): @@ -678,13 +778,15 @@ class GdbRemoteTestCaseBase(TestBase): val = match.group(2) if key in kv_dict: if allow_dupes: - if type(kv_dict[key]) == list: + if isinstance(kv_dict[key], list): kv_dict[key].append(val) else: # Promote to list kv_dict[key] = [kv_dict[key], val] else: - self.fail("key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(key, val, key_val_text, kv_dict)) + self.fail( + "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format( + key, val, key_val_text, kv_dict)) else: kv_dict[key] = val return kv_dict @@ -694,17 +796,25 @@ class GdbRemoteTestCaseBase(TestBase): self.assertIsNotNone(context.get("memory_region_response")) # Pull out key:value; pairs. - mem_region_dict = self.parse_key_val_dict(context.get("memory_region_response")) + mem_region_dict = self.parse_key_val_dict( + context.get("memory_region_response")) # Validate keys are known. for (key, val) in list(mem_region_dict.items()): - self.assertTrue(key in ["start", "size", "permissions", "error"]) + self.assertTrue( + key in [ + "start", + "size", + "permissions", + "name", + "error"]) self.assertIsNotNone(val) # Return the dictionary of key-value pairs for the memory region. return mem_region_dict - def assert_address_within_memory_region(self, test_address, mem_region_dict): + def assert_address_within_memory_region( + self, test_address, mem_region_dict): self.assertIsNotNone(mem_region_dict) self.assertTrue("start" in mem_region_dict) self.assertTrue("size" in mem_region_dict) @@ -714,15 +824,25 @@ class GdbRemoteTestCaseBase(TestBase): range_end = range_start + range_size if test_address < range_start: - self.fail("address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size)) + self.fail( + "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( + test_address, + range_start, + range_end, + range_size)) elif test_address >= range_end: - self.fail("address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size)) + self.fail( + "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( + test_address, + range_start, + range_end, + range_size)) def add_threadinfo_collection_packets(self): self.test_sequence.add_log_lines( - [ { "type":"multi_response", "first_query":"qfThreadInfo", "next_query":"qsThreadInfo", - "append_iteration_suffix":False, "end_regex":re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"), - "save_key":"threadinfo_responses" } ], + [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo", + "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"), + "save_key": "threadinfo_responses"}], True) def parse_threadinfo_packets(self, context): @@ -760,35 +880,44 @@ class GdbRemoteTestCaseBase(TestBase): return threads - def add_set_breakpoint_packets(self, address, do_continue=True, breakpoint_kind=1): + def add_set_breakpoint_packets( + self, + address, + do_continue=True, + breakpoint_kind=1): self.test_sequence.add_log_lines( - [# Set the breakpoint. - "read packet: $Z0,{0:x},{1}#00".format(address, breakpoint_kind), - # Verify the stub could set it. - "send packet: $OK#00", - ], True) + [ # Set the breakpoint. + "read packet: $Z0,{0:x},{1}#00".format( + address, breakpoint_kind), + # Verify the stub could set it. + "send packet: $OK#00", + ], True) if (do_continue): self.test_sequence.add_log_lines( - [# Continue the inferior. - "read packet: $c#63", - # Expect a breakpoint stop report. - {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} }, - ], True) + [ # Continue the inferior. + "read packet: $c#63", + # Expect a breakpoint stop report. + {"direction": "send", + "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", + "capture": {1: "stop_signo", + 2: "stop_thread_id"}}, + ], True) def add_remove_breakpoint_packets(self, address, breakpoint_kind=1): self.test_sequence.add_log_lines( - [# Remove the breakpoint. - "read packet: $z0,{0:x},{1}#00".format(address, breakpoint_kind), - # Verify the stub could unset it. - "send packet: $OK#00", + [ # Remove the breakpoint. + "read packet: $z0,{0:x},{1}#00".format( + address, breakpoint_kind), + # Verify the stub could unset it. + "send packet: $OK#00", ], True) def add_qSupported_packets(self): self.test_sequence.add_log_lines( ["read packet: $qSupported#00", - {"direction":"send", "regex":r"^\$(.*)#[0-9a-fA-F]{2}", "capture":{1: "qSupported_response"}}, - ], True) + {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}}, + ], True) _KNOWN_QSUPPORTED_STUB_FEATURES = [ "augmented-libraries-svr4-read", @@ -821,23 +950,27 @@ class GdbRemoteTestCaseBase(TestBase): supported_dict[key] = val else: if len(key) < 2: - raise Exception("singular stub feature is too short: must be stub_feature{+,-,?}") + raise Exception( + "singular stub feature is too short: must be stub_feature{+,-,?}") supported_type = key[-1] key = key[:-1] if not supported_type in ["+", "-", "?"]: - raise Exception("malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type)) - supported_dict[key] = supported_type + raise Exception( + "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type)) + supported_dict[key] = supported_type # Ensure we know the supported element - if not key in self._KNOWN_QSUPPORTED_STUB_FEATURES: - raise Exception("unknown qSupported stub feature reported: %s" % key) + if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES: + raise Exception( + "unknown qSupported stub feature reported: %s" % + key) return supported_dict def run_process_then_stop(self, run_seconds=1): # Tell the stub to continue. self.test_sequence.add_log_lines( - ["read packet: $vCont;c#a8"], - True) + ["read packet: $vCont;c#a8"], + True) context = self.expect_gdbremote_sequence() # Wait for run_seconds. @@ -847,7 +980,7 @@ class GdbRemoteTestCaseBase(TestBase): self.reset_test_sequence() self.test_sequence.add_log_lines( ["read packet: {}".format(chr(3)), - {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result"} }], + {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}], True) context = self.expect_gdbremote_sequence() self.assertIsNotNone(context) @@ -857,18 +990,21 @@ class GdbRemoteTestCaseBase(TestBase): def select_modifiable_register(self, reg_infos): """Find a register that can be read/written freely.""" - PREFERRED_REGISTER_NAMES = set(["rax",]) + PREFERRED_REGISTER_NAMES = set(["rax", ]) - # First check for the first register from the preferred register name set. + # First check for the first register from the preferred register name + # set. alternative_register_index = None self.assertIsNotNone(reg_infos) for reg_info in reg_infos: - if ("name" in reg_info) and (reg_info["name"] in PREFERRED_REGISTER_NAMES): + if ("name" in reg_info) and ( + reg_info["name"] in PREFERRED_REGISTER_NAMES): # We found a preferred register. Use it. return reg_info["lldb_register_index"] if ("generic" in reg_info) and (reg_info["generic"] == "fp"): - # A frame pointer register will do as a register to modify temporarily. + # A frame pointer register will do as a register to modify + # temporarily. alternative_register_index = reg_info["lldb_register_index"] # We didn't find a preferred register. Return whatever alternative register @@ -901,7 +1037,8 @@ class GdbRemoteTestCaseBase(TestBase): def find_generic_register_with_name(self, reg_infos, generic_name): self.assertIsNotNone(reg_infos) for reg_info in reg_infos: - if ("generic" in reg_info) and (reg_info["generic"] == generic_name): + if ("generic" in reg_info) and ( + reg_info["generic"] == generic_name): return reg_info return None @@ -912,13 +1049,13 @@ class GdbRemoteTestCaseBase(TestBase): if encoded_bytes[i] == "}": # Handle escaped char. self.assertTrue(i + 1 < len(encoded_bytes)) - decoded_bytes += chr(ord(encoded_bytes[i+1]) ^ 0x20) - i +=2 + decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20) + i += 2 elif encoded_bytes[i] == "*": # Handle run length encoding. self.assertTrue(len(decoded_bytes) > 0) self.assertTrue(i + 1 < len(encoded_bytes)) - repeat_count = ord(encoded_bytes[i+1]) - 29 + repeat_count = ord(encoded_bytes[i + 1]) - 29 decoded_bytes += decoded_bytes[-1] * repeat_count i += 2 else: @@ -955,7 +1092,8 @@ class GdbRemoteTestCaseBase(TestBase): self.assertFalse(key in auxv_dict) auxv_dict[key] = value - self.fail("should not reach here - implies required double zero entry not found") + self.fail( + "should not reach here - implies required double zero entry not found") return auxv_dict def read_binary_data_in_chunks(self, command_prefix, chunk_length): @@ -967,10 +1105,21 @@ class GdbRemoteTestCaseBase(TestBase): while not done: # Grab the next iteration of data. self.reset_test_sequence() - self.test_sequence.add_log_lines([ - "read packet: ${}{:x},{:x}:#00".format(command_prefix, offset, chunk_length), - {"direction":"send", "regex":re.compile(r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", re.MULTILINE|re.DOTALL), "capture":{1:"response_type", 2:"content_raw"} } - ], True) + self.test_sequence.add_log_lines( + [ + "read packet: ${}{:x},{:x}:#00".format( + command_prefix, + offset, + chunk_length), + { + "direction": "send", + "regex": re.compile( + r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", + re.MULTILINE | re.DOTALL), + "capture": { + 1: "response_type", + 2: "content_raw"}}], + True) context = self.expect_gdbremote_sequence() self.assertIsNotNone(context) @@ -997,25 +1146,32 @@ class GdbRemoteTestCaseBase(TestBase): # Send the intterupt. "read packet: {}".format(chr(3)), # And wait for the stop notification. - {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", "capture":{1:"stop_signo", 2:"stop_key_val_text" } }, - ], True) + {"direction": "send", + "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", + "capture": {1: "stop_signo", + 2: "stop_key_val_text"}}, + ], True) def parse_interrupt_packets(self, context): self.assertIsNotNone(context.get("stop_signo")) self.assertIsNotNone(context.get("stop_key_val_text")) - return (int(context["stop_signo"], 16), self.parse_key_val_dict(context["stop_key_val_text"])) + return (int(context["stop_signo"], 16), self.parse_key_val_dict( + context["stop_key_val_text"])) def add_QSaveRegisterState_packets(self, thread_id): if thread_id: # Use the thread suffix form. - request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(thread_id) + request = "read packet: $QSaveRegisterState;thread:{:x}#00".format( + thread_id) else: request = "read packet: $QSaveRegisterState#00" - - self.test_sequence.add_log_lines([ - request, - {"direction":"send", "regex":r"^\$(E?.*)#[0-9a-fA-F]{2}$", "capture":{1:"save_response" } }, - ], True) + + self.test_sequence.add_log_lines([request, + {"direction": "send", + "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$", + "capture": {1: "save_response"}}, + ], + True) def parse_QSaveRegisterState_response(self, context): self.assertIsNotNone(context) @@ -1032,16 +1188,19 @@ class GdbRemoteTestCaseBase(TestBase): def add_QRestoreRegisterState_packets(self, save_id, thread_id=None): if thread_id: # Use the thread suffix form. - request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(save_id, thread_id) + request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format( + save_id, thread_id) else: - request = "read packet: $QRestoreRegisterState:{}#00".format(save_id) + request = "read packet: $QRestoreRegisterState:{}#00".format( + save_id) self.test_sequence.add_log_lines([ request, "send packet: $OK#00" - ], True) + ], True) - def flip_all_bits_in_each_register_value(self, reg_infos, endian, thread_id=None): + def flip_all_bits_in_each_register_value( + self, reg_infos, endian, thread_id=None): self.assertIsNotNone(reg_infos) successful_writes = 0 @@ -1049,16 +1208,18 @@ class GdbRemoteTestCaseBase(TestBase): for reg_info in reg_infos: # Use the lldb register index added to the reg info. We're not necessarily - # working off a full set of register infos, so an inferred register index could be wrong. + # working off a full set of register infos, so an inferred register + # index could be wrong. reg_index = reg_info["lldb_register_index"] self.assertIsNotNone(reg_index) - reg_byte_size = int(reg_info["bitsize"])/8 + reg_byte_size = int(reg_info["bitsize"]) / 8 self.assertTrue(reg_byte_size > 0) # Handle thread suffix. if thread_id: - p_request = "read packet: $p{:x};thread:{:x}#00".format(reg_index, thread_id) + p_request = "read packet: $p{:x};thread:{:x}#00".format( + reg_index, thread_id) else: p_request = "read packet: $p{:x}#00".format(reg_index) @@ -1066,15 +1227,16 @@ class GdbRemoteTestCaseBase(TestBase): self.reset_test_sequence() self.test_sequence.add_log_lines([ p_request, - { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} }, - ], True) + {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, + ], True) context = self.expect_gdbremote_sequence() self.assertIsNotNone(context) # Verify the response length. p_response = context.get("p_response") self.assertIsNotNone(p_response) - initial_reg_value = unpack_register_hex_unsigned(endian, p_response) + initial_reg_value = unpack_register_hex_unsigned( + endian, p_response) # Flip the value by xoring with all 1s all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) / 8) @@ -1083,16 +1245,22 @@ class GdbRemoteTestCaseBase(TestBase): # Handle thread suffix for P. if thread_id: - P_request = "read packet: $P{:x}={};thread:{:x}#00".format(reg_index, pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size), thread_id) + P_request = "read packet: $P{:x}={};thread:{:x}#00".format( + reg_index, pack_register_hex( + endian, flipped_bits_int, byte_size=reg_byte_size), thread_id) else: - P_request = "read packet: $P{:x}={}#00".format(reg_index, pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size)) + P_request = "read packet: $P{:x}={}#00".format( + reg_index, pack_register_hex( + endian, flipped_bits_int, byte_size=reg_byte_size)) # Write the flipped value to the register. self.reset_test_sequence() - self.test_sequence.add_log_lines([ - P_request, - { "direction":"send", "regex":r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", "capture":{1:"P_response"} }, - ], True) + self.test_sequence.add_log_lines([P_request, + {"direction": "send", + "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", + "capture": {1: "P_response"}}, + ], + True) context = self.expect_gdbremote_sequence() self.assertIsNotNone(context) @@ -1107,25 +1275,27 @@ class GdbRemoteTestCaseBase(TestBase): failed_writes += 1 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response)) - # Read back the register value, ensure it matches the flipped value. + # Read back the register value, ensure it matches the flipped + # value. if P_response == "OK": self.reset_test_sequence() self.test_sequence.add_log_lines([ p_request, - { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} }, - ], True) + {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, + ], True) context = self.expect_gdbremote_sequence() self.assertIsNotNone(context) verify_p_response_raw = context.get("p_response") self.assertIsNotNone(verify_p_response_raw) - verify_bits = unpack_register_hex_unsigned(endian, verify_p_response_raw) + verify_bits = unpack_register_hex_unsigned( + endian, verify_p_response_raw) if verify_bits != flipped_bits_int: # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts. # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits)) successful_writes -= 1 - failed_writes +=1 + failed_writes += 1 return (successful_writes, failed_writes) @@ -1136,7 +1306,8 @@ class GdbRemoteTestCaseBase(TestBase): return False if reg_info["set"] != "General Purpose Registers": return False - if ("container-regs" in reg_info) and (len(reg_info["container-regs"]) > 0): + if ("container-regs" in reg_info) and ( + len(reg_info["container-regs"]) > 0): # Don't try to bit flip registers contained in another register. return False if re.match("^.s$", reg_info["name"]): @@ -1154,13 +1325,15 @@ class GdbRemoteTestCaseBase(TestBase): values = {} for reg_info in reg_infos: - # We append a register index when load reg infos so we can work with subsets. + # We append a register index when load reg infos so we can work + # with subsets. reg_index = reg_info.get("lldb_register_index") self.assertIsNotNone(reg_index) # Handle thread suffix. if thread_id: - p_request = "read packet: $p{:x};thread:{:x}#00".format(reg_index, thread_id) + p_request = "read packet: $p{:x};thread:{:x}#00".format( + reg_index, thread_id) else: p_request = "read packet: $p{:x}#00".format(reg_index) @@ -1168,8 +1341,8 @@ class GdbRemoteTestCaseBase(TestBase): self.reset_test_sequence() self.test_sequence.add_log_lines([ p_request, - { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} }, - ], True) + {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, + ], True) context = self.expect_gdbremote_sequence() self.assertIsNotNone(context) @@ -1178,58 +1351,75 @@ class GdbRemoteTestCaseBase(TestBase): self.assertIsNotNone(p_response) self.assertTrue(len(p_response) > 0) self.assertFalse(p_response[0] == "E") - - values[reg_index] = unpack_register_hex_unsigned(endian, p_response) - + + values[reg_index] = unpack_register_hex_unsigned( + endian, p_response) + return values def add_vCont_query_packets(self): - self.test_sequence.add_log_lines([ - "read packet: $vCont?#49", - {"direction":"send", "regex":r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", "capture":{2:"vCont_query_response" } }, - ], True) + self.test_sequence.add_log_lines(["read packet: $vCont?#49", + {"direction": "send", + "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", + "capture": {2: "vCont_query_response"}}, + ], + True) def parse_vCont_query_response(self, context): self.assertIsNotNone(context) vCont_query_response = context.get("vCont_query_response") - # Handle case of no vCont support at all - in which case the capture group will be none or zero length. + # Handle case of no vCont support at all - in which case the capture + # group will be none or zero length. if not vCont_query_response or len(vCont_query_response) == 0: return {} - return {key:1 for key in vCont_query_response.split(";") if key and len(key) > 0} - - def count_single_steps_until_true(self, thread_id, predicate, args, max_step_count=100, use_Hc_packet=True, step_instruction="s"): + return {key: 1 for key in vCont_query_response.split( + ";") if key and len(key) > 0} + + def count_single_steps_until_true( + self, + thread_id, + predicate, + args, + max_step_count=100, + use_Hc_packet=True, + step_instruction="s"): """Used by single step test that appears in a few different contexts.""" single_step_count = 0 while single_step_count < max_step_count: self.assertIsNotNone(thread_id) - # Build the packet for the single step instruction. We replace {thread}, if present, with the thread_id. - step_packet = "read packet: ${}#00".format(re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction)) + # Build the packet for the single step instruction. We replace + # {thread}, if present, with the thread_id. + step_packet = "read packet: ${}#00".format( + re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction)) # print("\nstep_packet created: {}\n".format(step_packet)) # Single step. self.reset_test_sequence() if use_Hc_packet: self.test_sequence.add_log_lines( - [# Set the continue thread. - "read packet: $Hc{0:x}#00".format(thread_id), - "send packet: $OK#00", - ], True) + [ # Set the continue thread. + "read packet: $Hc{0:x}#00".format(thread_id), + "send packet: $OK#00", + ], True) self.test_sequence.add_log_lines([ - # Single step. - step_packet, - # "read packet: $vCont;s:{0:x}#00".format(thread_id), - # Expect a breakpoint stop report. - {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} }, - ], True) + # Single step. + step_packet, + # "read packet: $vCont;s:{0:x}#00".format(thread_id), + # Expect a breakpoint stop report. + {"direction": "send", + "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", + "capture": {1: "stop_signo", + 2: "stop_thread_id"}}, + ], True) context = self.expect_gdbremote_sequence() self.assertIsNotNone(context) self.assertIsNotNone(context.get("stop_signo")) self.assertEqual(int(context.get("stop_signo"), 16), - lldbutil.get_signal_number('SIGTRAP')) + lldbutil.get_signal_number('SIGTRAP')) single_step_count += 1 @@ -1251,9 +1441,9 @@ class GdbRemoteTestCaseBase(TestBase): self.reset_test_sequence() self.test_sequence.add_log_lines( ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1), - {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"g_c1_contents"} }, + {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}}, "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1), - {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"g_c2_contents"} }], + {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}], True) # Run the packet stream. @@ -1264,26 +1454,34 @@ class GdbRemoteTestCaseBase(TestBase): self.assertIsNotNone(context.get("g_c1_contents")) self.assertIsNotNone(context.get("g_c2_contents")) - return (context.get("g_c1_contents").decode("hex") == expected_g_c1) and (context.get("g_c2_contents").decode("hex") == expected_g_c2) + return (context.get("g_c1_contents").decode("hex") == expected_g_c1) and ( + context.get("g_c2_contents").decode("hex") == expected_g_c2) - def single_step_only_steps_one_instruction(self, use_Hc_packet=True, step_instruction="s"): + def single_step_only_steps_one_instruction( + self, use_Hc_packet=True, step_instruction="s"): """Used by single step test that appears in a few different contexts.""" # Start up the inferior. procs = self.prep_debug_monitor_and_inferior( - inferior_args=["get-code-address-hex:swap_chars", "get-data-address-hex:g_c1", "get-data-address-hex:g_c2", "sleep:1", "call-function:swap_chars", "sleep:5"]) + inferior_args=[ + "get-code-address-hex:swap_chars", + "get-data-address-hex:g_c1", + "get-data-address-hex:g_c2", + "sleep:1", + "call-function:swap_chars", + "sleep:5"]) # Run the process self.test_sequence.add_log_lines( - [# Start running after initial stop. - "read packet: $c#63", - # Match output line that prints the memory address of the function call entry point. - # Note we require launch-only testing so we can get inferior otuput. - { "type":"output_match", "regex":r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$", - "capture":{ 1:"function_address", 2:"g_c1_address", 3:"g_c2_address"} }, - # Now stop the inferior. - "read packet: {}".format(chr(3)), - # And wait for the stop notification. - {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} }], + [ # Start running after initial stop. + "read packet: $c#63", + # Match output line that prints the memory address of the function call entry point. + # Note we require launch-only testing so we can get inferior otuput. + {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$", + "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}}, + # Now stop the inferior. + "read packet: {}".format(chr(3)), + # And wait for the stop notification. + {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], True) # Run the packet stream. @@ -1312,13 +1510,17 @@ class GdbRemoteTestCaseBase(TestBase): else: BREAKPOINT_KIND = 1 self.reset_test_sequence() - self.add_set_breakpoint_packets(function_address, do_continue=True, breakpoint_kind=BREAKPOINT_KIND) + self.add_set_breakpoint_packets( + function_address, + do_continue=True, + breakpoint_kind=BREAKPOINT_KIND) context = self.expect_gdbremote_sequence() self.assertIsNotNone(context) # Remove the breakpoint. self.reset_test_sequence() - self.add_remove_breakpoint_packets(function_address, breakpoint_kind=BREAKPOINT_KIND) + self.add_remove_breakpoint_packets( + function_address, breakpoint_kind=BREAKPOINT_KIND) context = self.expect_gdbremote_sequence() self.assertIsNotNone(context) @@ -1331,42 +1533,83 @@ class GdbRemoteTestCaseBase(TestBase): self.assertTrue(self.g_c1_c2_contents_are(args)) - # Verify we take only a small number of steps to hit the first state. Might need to work through function entry prologue code. + # Verify we take only a small number of steps to hit the first state. + # Might need to work through function entry prologue code. args["expected_g_c1"] = "1" args["expected_g_c2"] = "1" - (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=25, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction) + (state_reached, + step_count) = self.count_single_steps_until_true(main_thread_id, + self.g_c1_c2_contents_are, + args, + max_step_count=25, + use_Hc_packet=use_Hc_packet, + step_instruction=step_instruction) self.assertTrue(state_reached) # Verify we hit the next state. args["expected_g_c1"] = "1" args["expected_g_c2"] = "0" - (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction) + (state_reached, + step_count) = self.count_single_steps_until_true(main_thread_id, + self.g_c1_c2_contents_are, + args, + max_step_count=5, + use_Hc_packet=use_Hc_packet, + step_instruction=step_instruction) self.assertTrue(state_reached) expected_step_count = 1 arch = self.getArchitecture() - #MIPS required "3" (ADDIU, SB, LD) machine instructions for updation of variable value - if re.match("mips",arch): - expected_step_count = 3 - #S390X requires "2" (LARL, MVI) machine instructions for updation of variable value - if re.match("s390x",arch): - expected_step_count = 2 + # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation + # of variable value + if re.match("mips", arch): + expected_step_count = 3 + # S390X requires "2" (LARL, MVI) machine instructions for updation of + # variable value + if re.match("s390x", arch): + expected_step_count = 2 self.assertEqual(step_count, expected_step_count) # Verify we hit the next state. args["expected_g_c1"] = "0" args["expected_g_c2"] = "0" - (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction) + (state_reached, + step_count) = self.count_single_steps_until_true(main_thread_id, + self.g_c1_c2_contents_are, + args, + max_step_count=5, + use_Hc_packet=use_Hc_packet, + step_instruction=step_instruction) self.assertTrue(state_reached) self.assertEqual(step_count, expected_step_count) # Verify we hit the next state. args["expected_g_c1"] = "0" args["expected_g_c2"] = "1" - (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction) + (state_reached, + step_count) = self.count_single_steps_until_true(main_thread_id, + self.g_c1_c2_contents_are, + args, + max_step_count=5, + use_Hc_packet=use_Hc_packet, + step_instruction=step_instruction) self.assertTrue(state_reached) self.assertEqual(step_count, expected_step_count) def maybe_strict_output_regex(self, regex): - return '.*'+regex+'.*' if lldbplatformutil.hasChattyStderr(self) else '^'+regex+'$' - + return '.*' + regex + \ + '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$' + + def install_and_create_launch_args(self): + exe_path = os.path.abspath('a.out') + if not lldb.remote_platform: + return [exe_path] + remote_path = lldbutil.append_to_process_working_directory( + os.path.basename(exe_path)) + remote_file_spec = lldb.SBFileSpec(remote_path, False) + err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True), + remote_file_spec) + if err.Fail(): + raise Exception("remote_platform.Install('%s', '%s') failed: %s" % + (exe_path, remote_path, err)) + return [remote_path] |