aboutsummaryrefslogtreecommitdiff
path: root/packages/Python/lldbsuite/test_event
diff options
context:
space:
mode:
authorDimitry Andric <dim@FreeBSD.org>2016-07-23 20:50:09 +0000
committerDimitry Andric <dim@FreeBSD.org>2016-07-23 20:50:09 +0000
commitf3fbd1c0586ff6ec7895991e6c28f61a503c36a8 (patch)
tree48d008fd3df8c0e73271a4b18474e0aac6dbfe33 /packages/Python/lldbsuite/test_event
parent2fc5d2d1dfaf623ce4e24cd8590565902f8c557c (diff)
downloadsrc-f3fbd1c0586ff6ec7895991e6c28f61a503c36a8.tar.gz
src-f3fbd1c0586ff6ec7895991e6c28f61a503c36a8.zip
Vendor import of lldb release_39 branch r276489:vendor/lldb/lldb-release_39-r276489
Notes
Notes: svn path=/vendor/lldb/dist/; revision=303241 svn path=/vendor/lldb/lldb-release_39-r276489/; revision=303242; tag=vendor/lldb/lldb-release_39-r276489
Diffstat (limited to 'packages/Python/lldbsuite/test_event')
-rw-r--r--packages/Python/lldbsuite/test_event/__init__.py0
-rw-r--r--packages/Python/lldbsuite/test_event/build_exception.py14
-rw-r--r--packages/Python/lldbsuite/test_event/dotest_channels.py207
-rw-r--r--packages/Python/lldbsuite/test_event/event_builder.py475
-rw-r--r--packages/Python/lldbsuite/test_event/formatter/__init__.py162
-rw-r--r--packages/Python/lldbsuite/test_event/formatter/curses.py229
-rw-r--r--packages/Python/lldbsuite/test_event/formatter/dump_formatter.py23
-rw-r--r--packages/Python/lldbsuite/test_event/formatter/pickled.py72
-rw-r--r--packages/Python/lldbsuite/test_event/formatter/results_formatter.py717
-rw-r--r--packages/Python/lldbsuite/test_event/formatter/xunit.py592
-rw-r--r--packages/Python/lldbsuite/test_event/test/resources/invalid_decorator/TestInvalidDecorator.py13
-rw-r--r--packages/Python/lldbsuite/test_event/test/src/TestCatchInvalidDecorator.py70
-rw-r--r--packages/Python/lldbsuite/test_event/test/src/event_collector.py85
13 files changed, 2659 insertions, 0 deletions
diff --git a/packages/Python/lldbsuite/test_event/__init__.py b/packages/Python/lldbsuite/test_event/__init__.py
new file mode 100644
index 000000000000..e69de29bb2d1
--- /dev/null
+++ b/packages/Python/lldbsuite/test_event/__init__.py
diff --git a/packages/Python/lldbsuite/test_event/build_exception.py b/packages/Python/lldbsuite/test_event/build_exception.py
new file mode 100644
index 000000000000..4a7c5f4a9d38
--- /dev/null
+++ b/packages/Python/lldbsuite/test_event/build_exception.py
@@ -0,0 +1,14 @@
+class BuildError(Exception):
+ def __init__(self, called_process_error):
+ super(BuildError, self).__init__("Error when building test subject")
+ self.command = called_process_error.lldb_extensions.get("command", "<command unavailable>")
+ self.build_error = called_process_error.lldb_extensions.get("stderr_content", "<error output unavailable>")
+
+ def __str__(self):
+ return self.format_build_error(self.command, self.build_error)
+
+ @staticmethod
+ def format_build_error(command, command_output):
+ return "Error when building test subject.\n\nBuild Command:\n{}\n\nBuild Command Output:\n{}".format(
+ command,
+ command_output)
diff --git a/packages/Python/lldbsuite/test_event/dotest_channels.py b/packages/Python/lldbsuite/test_event/dotest_channels.py
new file mode 100644
index 000000000000..d69720e4f66a
--- /dev/null
+++ b/packages/Python/lldbsuite/test_event/dotest_channels.py
@@ -0,0 +1,207 @@
+"""
+ The LLVM Compiler Infrastructure
+
+This file is distributed under the University of Illinois Open Source
+License. See LICENSE.TXT for details.
+
+Sync lldb and related source from a local machine to a remote machine.
+
+This facilitates working on the lldb sourcecode on multiple machines
+and multiple OS types, verifying changes across all.
+
+
+This module provides asyncore channels used within the LLDB test
+framework.
+"""
+
+from __future__ import print_function
+from __future__ import absolute_import
+
+
+# System modules
+import asyncore
+import socket
+
+# Third-party modules
+from six.moves import cPickle
+
+# LLDB modules
+
+
+class UnpicklingForwardingReaderChannel(asyncore.dispatcher):
+ """Provides an unpickling, forwarding asyncore dispatch channel reader.
+
+ Inferior dotest.py processes with side-channel-based test results will
+ send test result event data in a pickled format, one event at a time.
+ This class supports reconstructing the pickled data and forwarding it
+ on to its final destination.
+
+ The channel data is written in the form:
+ {num_payload_bytes}#{payload_bytes}
+
+ The bulk of this class is devoted to reading and parsing out
+ the payload bytes.
+ """
+ def __init__(self, file_object, async_map, forwarding_func):
+ asyncore.dispatcher.__init__(self, sock=file_object, map=async_map)
+
+ self.header_contents = b""
+ self.packet_bytes_remaining = 0
+ self.reading_header = True
+ self.ibuffer = b''
+ self.forwarding_func = forwarding_func
+ if forwarding_func is None:
+ # This whole class is useless if we do nothing with the
+ # unpickled results.
+ raise Exception("forwarding function must be set")
+
+ # Initiate all connections by sending an ack. This allows
+ # the initiators of the socket to await this to ensure
+ # that this end is up and running (and therefore already
+ # into the async map).
+ ack_bytes = b'*'
+ file_object.send(ack_bytes)
+
+ def deserialize_payload(self):
+ """Unpickles the collected input buffer bytes and forwards."""
+ if len(self.ibuffer) > 0:
+ self.forwarding_func(cPickle.loads(self.ibuffer))
+ self.ibuffer = b''
+
+ def consume_header_bytes(self, data):
+ """Consumes header bytes from the front of data.
+ @param data the incoming data stream bytes
+ @return any data leftover after consuming header bytes.
+ """
+ # We're done if there is no content.
+ if not data or (len(data) == 0):
+ return None
+
+ full_header_len = 4
+
+ assert len(self.header_contents) < full_header_len
+
+ bytes_avail = len(data)
+ bytes_needed = full_header_len - len(self.header_contents)
+ header_bytes_avail = min(bytes_needed, bytes_avail)
+ self.header_contents += data[:header_bytes_avail]
+ if len(self.header_contents) == full_header_len:
+ import struct
+ # End of header.
+ self.packet_bytes_remaining = struct.unpack(
+ "!I", self.header_contents)[0]
+ self.header_contents = b""
+ self.reading_header = False
+ return data[header_bytes_avail:]
+
+ # If we made it here, we've exhausted the data and
+ # we're still parsing header content.
+ return None
+
+ def consume_payload_bytes(self, data):
+ """Consumes payload bytes from the front of data.
+ @param data the incoming data stream bytes
+ @return any data leftover after consuming remaining payload bytes.
+ """
+ if not data or (len(data) == 0):
+ # We're done and there's nothing to do.
+ return None
+
+ data_len = len(data)
+ if data_len <= self.packet_bytes_remaining:
+ # We're consuming all the data provided.
+ self.ibuffer += data
+ self.packet_bytes_remaining -= data_len
+
+ # If we're no longer waiting for payload bytes,
+ # we flip back to parsing header bytes and we
+ # unpickle the payload contents.
+ if self.packet_bytes_remaining < 1:
+ self.reading_header = True
+ self.deserialize_payload()
+
+ # We're done, no more data left.
+ return None
+ else:
+ # We're only consuming a portion of the data since
+ # the data contains more than the payload amount.
+ self.ibuffer += data[:self.packet_bytes_remaining]
+ data = data[self.packet_bytes_remaining:]
+
+ # We now move on to reading the header.
+ self.reading_header = True
+ self.packet_bytes_remaining = 0
+
+ # And we can deserialize the payload.
+ self.deserialize_payload()
+
+ # Return the remaining data.
+ return data
+
+ def handle_read(self):
+ # Read some data from the socket.
+ try:
+ data = self.recv(8192)
+ # print('driver socket READ: %d bytes' % len(data))
+ except socket.error as socket_error:
+ print(
+ "\nINFO: received socket error when reading data "
+ "from test inferior:\n{}".format(socket_error))
+ raise
+ except Exception as general_exception:
+ print(
+ "\nERROR: received non-socket error when reading data "
+ "from the test inferior:\n{}".format(general_exception))
+ raise
+
+ # Consume the message content.
+ while data and (len(data) > 0):
+ # If we're reading the header, gather header bytes.
+ if self.reading_header:
+ data = self.consume_header_bytes(data)
+ else:
+ data = self.consume_payload_bytes(data)
+
+ def handle_close(self):
+ # print("socket reader: closing port")
+ self.close()
+
+
+class UnpicklingForwardingListenerChannel(asyncore.dispatcher):
+ """Provides a socket listener asyncore channel for unpickling/forwarding.
+
+ This channel will listen on a socket port (use 0 for host-selected). Any
+ client that connects will have an UnpicklingForwardingReaderChannel handle
+ communication over the connection.
+
+ The dotest parallel test runners, when collecting test results, open the
+ test results side channel over a socket. This channel handles connections
+ from inferiors back to the test runner. Each worker fires up a listener
+ for each inferior invocation. This simplifies the asyncore.loop() usage,
+ one of the reasons for implementing with asyncore. This listener shuts
+ down once a single connection is made to it.
+ """
+ def __init__(self, async_map, host, port, backlog_count, forwarding_func):
+ asyncore.dispatcher.__init__(self, map=async_map)
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.set_reuse_addr()
+ self.bind((host, port))
+ self.address = self.socket.getsockname()
+ self.listen(backlog_count)
+ self.handler = None
+ self.async_map = async_map
+ self.forwarding_func = forwarding_func
+ if forwarding_func is None:
+ # This whole class is useless if we do nothing with the
+ # unpickled results.
+ raise Exception("forwarding function must be set")
+
+ def handle_accept(self):
+ (sock, addr) = self.socket.accept()
+ if sock and addr:
+ # print('Incoming connection from %s' % repr(addr))
+ self.handler = UnpicklingForwardingReaderChannel(
+ sock, self.async_map, self.forwarding_func)
+
+ def handle_close(self):
+ self.close()
diff --git a/packages/Python/lldbsuite/test_event/event_builder.py b/packages/Python/lldbsuite/test_event/event_builder.py
new file mode 100644
index 000000000000..aabbd986bd70
--- /dev/null
+++ b/packages/Python/lldbsuite/test_event/event_builder.py
@@ -0,0 +1,475 @@
+"""
+ The LLVM Compiler Infrastructure
+
+This file is distributed under the University of Illinois Open Source
+License. See LICENSE.TXT for details.
+
+Provides a class to build Python test event data structures.
+"""
+
+from __future__ import print_function
+from __future__ import absolute_import
+
+# System modules
+import inspect
+import time
+import traceback
+
+# Third-party modules
+
+# LLDB modules
+from . import build_exception
+
+class EventBuilder(object):
+ """Helper class to build test result event dictionaries."""
+
+ BASE_DICTIONARY = None
+
+ # Test Event Types
+ TYPE_JOB_RESULT = "job_result"
+ TYPE_TEST_RESULT = "test_result"
+ TYPE_TEST_START = "test_start"
+ TYPE_MARK_TEST_RERUN_ELIGIBLE = "test_eligible_for_rerun"
+ TYPE_MARK_TEST_EXPECTED_FAILURE = "test_expected_failure"
+ TYPE_SESSION_TERMINATE = "terminate"
+
+ RESULT_TYPES = {TYPE_JOB_RESULT, TYPE_TEST_RESULT}
+
+ # Test/Job Status Tags
+ STATUS_EXCEPTIONAL_EXIT = "exceptional_exit"
+ STATUS_SUCCESS = "success"
+ STATUS_FAILURE = "failure"
+ STATUS_EXPECTED_FAILURE = "expected_failure"
+ STATUS_EXPECTED_TIMEOUT = "expected_timeout"
+ STATUS_UNEXPECTED_SUCCESS = "unexpected_success"
+ STATUS_SKIP = "skip"
+ STATUS_ERROR = "error"
+ STATUS_TIMEOUT = "timeout"
+
+ """Test methods or jobs with a status matching any of these
+ status values will cause a testrun failure, unless
+ the test methods rerun and do not trigger an issue when rerun."""
+ TESTRUN_ERROR_STATUS_VALUES = {
+ STATUS_ERROR,
+ STATUS_EXCEPTIONAL_EXIT,
+ STATUS_FAILURE,
+ STATUS_TIMEOUT}
+
+ @staticmethod
+ def _get_test_name_info(test):
+ """Returns (test-class-name, test-method-name) from a test case instance.
+
+ @param test a unittest.TestCase instance.
+
+ @return tuple containing (test class name, test method name)
+ """
+ test_class_components = test.id().split(".")
+ test_class_name = ".".join(test_class_components[:-1])
+ test_name = test_class_components[-1]
+ return test_class_name, test_name
+
+ @staticmethod
+ def bare_event(event_type):
+ """Creates an event with default additions, event type and timestamp.
+
+ @param event_type the value set for the "event" key, used
+ to distinguish events.
+
+ @returns an event dictionary with all default additions, the "event"
+ key set to the passed in event_type, and the event_time value set to
+ time.time().
+ """
+ if EventBuilder.BASE_DICTIONARY is not None:
+ # Start with a copy of the "always include" entries.
+ event = dict(EventBuilder.BASE_DICTIONARY)
+ else:
+ event = {}
+
+ event.update({
+ "event": event_type,
+ "event_time": time.time()
+ })
+ return event
+
+ @staticmethod
+ def _assert_is_python_sourcefile(test_filename):
+ if test_filename is not None:
+ if not test_filename.endswith(".py"):
+ raise Exception("source python filename has unexpected extension: {}".format(test_filename))
+ return test_filename
+
+ @staticmethod
+ def _event_dictionary_common(test, event_type):
+ """Returns an event dictionary setup with values for the given event type.
+
+ @param test the unittest.TestCase instance
+
+ @param event_type the name of the event type (string).
+
+ @return event dictionary with common event fields set.
+ """
+ test_class_name, test_name = EventBuilder._get_test_name_info(test)
+
+ # Determine the filename for the test case. If there is an attribute
+ # for it, use it. Otherwise, determine from the TestCase class path.
+ if hasattr(test, "test_filename"):
+ test_filename = EventBuilder._assert_is_python_sourcefile(test.test_filename)
+ else:
+ test_filename = EventBuilder._assert_is_python_sourcefile(inspect.getsourcefile(test.__class__))
+
+ event = EventBuilder.bare_event(event_type)
+ event.update({
+ "test_class": test_class_name,
+ "test_name": test_name,
+ "test_filename": test_filename
+ })
+
+ return event
+
+ @staticmethod
+ def _error_tuple_class(error_tuple):
+ """Returns the unittest error tuple's error class as a string.
+
+ @param error_tuple the error tuple provided by the test framework.
+
+ @return the error type (typically an exception) raised by the
+ test framework.
+ """
+ type_var = error_tuple[0]
+ module = inspect.getmodule(type_var)
+ if module:
+ return "{}.{}".format(module.__name__, type_var.__name__)
+ else:
+ return type_var.__name__
+
+ @staticmethod
+ def _error_tuple_message(error_tuple):
+ """Returns the unittest error tuple's error message.
+
+ @param error_tuple the error tuple provided by the test framework.
+
+ @return the error message provided by the test framework.
+ """
+ return str(error_tuple[1])
+
+ @staticmethod
+ def _error_tuple_traceback(error_tuple):
+ """Returns the unittest error tuple's error message.
+
+ @param error_tuple the error tuple provided by the test framework.
+
+ @return the error message provided by the test framework.
+ """
+ return error_tuple[2]
+
+ @staticmethod
+ def _event_dictionary_test_result(test, status):
+ """Returns an event dictionary with common test result fields set.
+
+ @param test a unittest.TestCase instance.
+
+ @param status the status/result of the test
+ (e.g. "success", "failure", etc.)
+
+ @return the event dictionary
+ """
+ event = EventBuilder._event_dictionary_common(
+ test, EventBuilder.TYPE_TEST_RESULT)
+ event["status"] = status
+ return event
+
+ @staticmethod
+ def _event_dictionary_issue(test, status, error_tuple):
+ """Returns an event dictionary with common issue-containing test result
+ fields set.
+
+ @param test a unittest.TestCase instance.
+
+ @param status the status/result of the test
+ (e.g. "success", "failure", etc.)
+
+ @param error_tuple the error tuple as reported by the test runner.
+ This is of the form (type<error>, error).
+
+ @return the event dictionary
+ """
+ event = EventBuilder._event_dictionary_test_result(test, status)
+ event["issue_class"] = EventBuilder._error_tuple_class(error_tuple)
+ event["issue_message"] = EventBuilder._error_tuple_message(error_tuple)
+ backtrace = EventBuilder._error_tuple_traceback(error_tuple)
+ if backtrace is not None:
+ event["issue_backtrace"] = traceback.format_tb(backtrace)
+ return event
+
+ @staticmethod
+ def event_for_start(test):
+ """Returns an event dictionary for the test start event.
+
+ @param test a unittest.TestCase instance.
+
+ @return the event dictionary
+ """
+ return EventBuilder._event_dictionary_common(
+ test, EventBuilder.TYPE_TEST_START)
+
+ @staticmethod
+ def event_for_success(test):
+ """Returns an event dictionary for a successful test.
+
+ @param test a unittest.TestCase instance.
+
+ @return the event dictionary
+ """
+ return EventBuilder._event_dictionary_test_result(
+ test, EventBuilder.STATUS_SUCCESS)
+
+ @staticmethod
+ def event_for_unexpected_success(test, bugnumber):
+ """Returns an event dictionary for a test that succeeded but was
+ expected to fail.
+
+ @param test a unittest.TestCase instance.
+
+ @param bugnumber the issue identifier for the bug tracking the
+ fix request for the test expected to fail (but is in fact
+ passing here).
+
+ @return the event dictionary
+
+ """
+ event = EventBuilder._event_dictionary_test_result(
+ test, EventBuilder.STATUS_UNEXPECTED_SUCCESS)
+ if bugnumber:
+ event["bugnumber"] = str(bugnumber)
+ return event
+
+ @staticmethod
+ def event_for_failure(test, error_tuple):
+ """Returns an event dictionary for a test that failed.
+
+ @param test a unittest.TestCase instance.
+
+ @param error_tuple the error tuple as reported by the test runner.
+ This is of the form (type<error>, error).
+
+ @return the event dictionary
+ """
+ return EventBuilder._event_dictionary_issue(
+ test, EventBuilder.STATUS_FAILURE, error_tuple)
+
+ @staticmethod
+ def event_for_expected_failure(test, error_tuple, bugnumber):
+ """Returns an event dictionary for a test that failed as expected.
+
+ @param test a unittest.TestCase instance.
+
+ @param error_tuple the error tuple as reported by the test runner.
+ This is of the form (type<error>, error).
+
+ @param bugnumber the issue identifier for the bug tracking the
+ fix request for the test expected to fail.
+
+ @return the event dictionary
+
+ """
+ event = EventBuilder._event_dictionary_issue(
+ test, EventBuilder.STATUS_EXPECTED_FAILURE, error_tuple)
+ if bugnumber:
+ event["bugnumber"] = str(bugnumber)
+ return event
+
+ @staticmethod
+ def event_for_skip(test, reason):
+ """Returns an event dictionary for a test that was skipped.
+
+ @param test a unittest.TestCase instance.
+
+ @param reason the reason why the test is being skipped.
+
+ @return the event dictionary
+ """
+ event = EventBuilder._event_dictionary_test_result(
+ test, EventBuilder.STATUS_SKIP)
+ event["skip_reason"] = reason
+ return event
+
+ @staticmethod
+ def event_for_error(test, error_tuple):
+ """Returns an event dictionary for a test that hit a test execution error.
+
+ @param test a unittest.TestCase instance.
+
+ @param error_tuple the error tuple as reported by the test runner.
+ This is of the form (type<error>, error).
+
+ @return the event dictionary
+ """
+ event = EventBuilder._event_dictionary_issue(
+ test, EventBuilder.STATUS_ERROR, error_tuple)
+ event["issue_phase"] = "test"
+ return event
+
+ @staticmethod
+ def event_for_build_error(test, error_tuple):
+ """Returns an event dictionary for a test that hit a test execution error
+ during the test cleanup phase.
+
+ @param test a unittest.TestCase instance.
+
+ @param error_tuple the error tuple as reported by the test runner.
+ This is of the form (type<error>, error).
+
+ @return the event dictionary
+ """
+ event = EventBuilder._event_dictionary_issue(
+ test, EventBuilder.STATUS_ERROR, error_tuple)
+ event["issue_phase"] = "build"
+
+ build_error = error_tuple[1]
+ event["build_command"] = build_error.command
+ event["build_error"] = build_error.build_error
+ return event
+
+ @staticmethod
+ def event_for_cleanup_error(test, error_tuple):
+ """Returns an event dictionary for a test that hit a test execution error
+ during the test cleanup phase.
+
+ @param test a unittest.TestCase instance.
+
+ @param error_tuple the error tuple as reported by the test runner.
+ This is of the form (type<error>, error).
+
+ @return the event dictionary
+ """
+ event = EventBuilder._event_dictionary_issue(
+ test, EventBuilder.STATUS_ERROR, error_tuple)
+ event["issue_phase"] = "cleanup"
+ return event
+
+ @staticmethod
+ def event_for_job_test_add_error(test_filename, exception, backtrace):
+ event = EventBuilder.bare_event(EventBuilder.TYPE_JOB_RESULT)
+ event["status"] = EventBuilder.STATUS_ERROR
+ if test_filename is not None:
+ event["test_filename"] = EventBuilder._assert_is_python_sourcefile(test_filename)
+ if exception is not None and "__class__" in dir(exception):
+ event["issue_class"] = exception.__class__
+ event["issue_message"] = exception
+ if backtrace is not None:
+ event["issue_backtrace"] = backtrace
+ return event
+
+ @staticmethod
+ def event_for_job_exceptional_exit(
+ pid, worker_index, exception_code, exception_description,
+ test_filename, command_line):
+ """Creates an event for a job (i.e. process) exit due to signal.
+
+ @param pid the process id for the job that failed
+ @param worker_index optional id for the job queue running the process
+ @param exception_code optional code
+ (e.g. SIGTERM integer signal number)
+ @param exception_description optional string containing symbolic
+ representation of the issue (e.g. "SIGTERM")
+ @param test_filename the path to the test filename that exited
+ in some exceptional way.
+ @param command_line the Popen()-style list provided as the command line
+ for the process that timed out.
+
+ @return an event dictionary coding the job completion description.
+ """
+ event = EventBuilder.bare_event(EventBuilder.TYPE_JOB_RESULT)
+ event["status"] = EventBuilder.STATUS_EXCEPTIONAL_EXIT
+ if pid is not None:
+ event["pid"] = pid
+ if worker_index is not None:
+ event["worker_index"] = int(worker_index)
+ if exception_code is not None:
+ event["exception_code"] = exception_code
+ if exception_description is not None:
+ event["exception_description"] = exception_description
+ if test_filename is not None:
+ event["test_filename"] = EventBuilder._assert_is_python_sourcefile(test_filename)
+ if command_line is not None:
+ event["command_line"] = command_line
+ return event
+
+ @staticmethod
+ def event_for_job_timeout(pid, worker_index, test_filename, command_line):
+ """Creates an event for a job (i.e. process) timeout.
+
+ @param pid the process id for the job that timed out
+ @param worker_index optional id for the job queue running the process
+ @param test_filename the path to the test filename that timed out.
+ @param command_line the Popen-style list provided as the command line
+ for the process that timed out.
+
+ @return an event dictionary coding the job completion description.
+ """
+ event = EventBuilder.bare_event(EventBuilder.TYPE_JOB_RESULT)
+ event["status"] = "timeout"
+ if pid is not None:
+ event["pid"] = pid
+ if worker_index is not None:
+ event["worker_index"] = int(worker_index)
+ if test_filename is not None:
+ event["test_filename"] = EventBuilder._assert_is_python_sourcefile(test_filename)
+ if command_line is not None:
+ event["command_line"] = command_line
+ return event
+
+ @staticmethod
+ def event_for_mark_test_rerun_eligible(test):
+ """Creates an event that indicates the specified test is explicitly
+ eligible for rerun.
+
+ Note there is a mode that will enable test rerun eligibility at the
+ global level. These markings for explicit rerun eligibility are
+ intended for the mode of running where only explicitly re-runnable
+ tests are rerun upon hitting an issue.
+
+ @param test the TestCase instance to which this pertains.
+
+ @return an event that specifies the given test as being eligible to
+ be rerun.
+ """
+ event = EventBuilder._event_dictionary_common(
+ test,
+ EventBuilder.TYPE_MARK_TEST_RERUN_ELIGIBLE)
+ return event
+
+ @staticmethod
+ def event_for_mark_test_expected_failure(test):
+ """Creates an event that indicates the specified test is expected
+ to fail.
+
+ @param test the TestCase instance to which this pertains.
+
+ @return an event that specifies the given test is expected to fail.
+ """
+ event = EventBuilder._event_dictionary_common(
+ test,
+ EventBuilder.TYPE_MARK_TEST_EXPECTED_FAILURE)
+ return event
+
+ @staticmethod
+ def add_entries_to_all_events(entries_dict):
+ """Specifies a dictionary of entries to add to all test events.
+
+ This provides a mechanism for, say, a parallel test runner to
+ indicate to each inferior dotest.py that it should add a
+ worker index to each.
+
+ Calling this method replaces all previous entries added
+ by a prior call to this.
+
+ Event build methods will overwrite any entries that collide.
+ Thus, the passed in dictionary is the base, which gets merged
+ over by event building when keys collide.
+
+ @param entries_dict a dictionary containing key and value
+ pairs that should be merged into all events created by the
+ event generator. May be None to clear out any extra entries.
+ """
+ EventBuilder.BASE_DICTIONARY = dict(entries_dict)
diff --git a/packages/Python/lldbsuite/test_event/formatter/__init__.py b/packages/Python/lldbsuite/test_event/formatter/__init__.py
new file mode 100644
index 000000000000..556370ebb9de
--- /dev/null
+++ b/packages/Python/lldbsuite/test_event/formatter/__init__.py
@@ -0,0 +1,162 @@
+"""
+ The LLVM Compiler Infrastructure
+
+This file is distributed under the University of Illinois Open Source
+License. See LICENSE.TXT for details.
+"""
+
+from __future__ import print_function
+from __future__ import absolute_import
+
+# System modules
+import importlib
+import socket
+import sys
+
+# Third-party modules
+
+# LLDB modules
+
+
+# Ignore method count on DTOs.
+# pylint: disable=too-few-public-methods
+class FormatterConfig(object):
+ """Provides formatter configuration info to create_results_formatter()."""
+
+ def __init__(self):
+ self.filename = None
+ self.port = None
+ self.formatter_name = None
+ self.formatter_options = None
+
+
+# Ignore method count on DTOs.
+# pylint: disable=too-few-public-methods
+class CreatedFormatter(object):
+ """Provides transfer object for returns from create_results_formatter()."""
+
+ def __init__(self, formatter, cleanup_func):
+ self.formatter = formatter
+ self.cleanup_func = cleanup_func
+
+
+def create_results_formatter(config):
+ """Sets up a test results formatter.
+
+ @param config an instance of FormatterConfig
+ that indicates how to setup the ResultsFormatter.
+
+ @return an instance of CreatedFormatter.
+ """
+
+ def create_socket(port):
+ """Creates a socket to the localhost on the given port.
+
+ @param port the port number of the listening port on
+ the localhost.
+
+ @return (socket object, socket closing function)
+ """
+
+ def socket_closer(open_sock):
+ """Close down an opened socket properly."""
+ open_sock.shutdown(socket.SHUT_RDWR)
+ open_sock.close()
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(("localhost", port))
+
+ # Wait for the ack from the listener side.
+ # This is needed to prevent a race condition
+ # in the main dosep.py processing loop: we
+ # can't allow a worker queue thread to die
+ # that has outstanding messages to a listener
+ # socket before the listener socket asyncore
+ # listener socket gets spun up; otherwise,
+ # we lose the test result info.
+ read_bytes = sock.recv(1)
+ if read_bytes is None or (len(read_bytes) < 1) or (read_bytes != b'*'):
+ raise Exception("listening socket did not respond with ack byte: response={}".format(read_bytes))
+
+ return sock, lambda: socket_closer(sock)
+
+ default_formatter_name = None
+ results_file_object = None
+ cleanup_func = None
+
+ file_is_stream = False
+ if config.filename:
+ # Open the results file for writing.
+ if config.filename == 'stdout':
+ results_file_object = sys.stdout
+ cleanup_func = None
+ elif config.filename == 'stderr':
+ results_file_object = sys.stderr
+ cleanup_func = None
+ else:
+ results_file_object = open(config.filename, "w")
+ cleanup_func = results_file_object.close
+ default_formatter_name = (
+ "lldbsuite.test_event.formatter.xunit.XunitFormatter")
+ elif config.port:
+ # Connect to the specified localhost port.
+ results_file_object, cleanup_func = create_socket(config.port)
+ default_formatter_name = (
+ "lldbsuite.test_event.formatter.pickled.RawPickledFormatter")
+ file_is_stream = True
+
+ # If we have a results formatter name specified and we didn't specify
+ # a results file, we should use stdout.
+ if config.formatter_name is not None and results_file_object is None:
+ # Use stdout.
+ results_file_object = sys.stdout
+ cleanup_func = None
+
+ if results_file_object:
+ # We care about the formatter. Choose user-specified or, if
+ # none specified, use the default for the output type.
+ if config.formatter_name:
+ formatter_name = config.formatter_name
+ else:
+ formatter_name = default_formatter_name
+
+ # Create an instance of the class.
+ # First figure out the package/module.
+ components = formatter_name.split(".")
+ module = importlib.import_module(".".join(components[:-1]))
+
+ # Create the class name we need to load.
+ cls = getattr(module, components[-1])
+
+ # Handle formatter options for the results formatter class.
+ formatter_arg_parser = cls.arg_parser()
+ if config.formatter_options and len(config.formatter_options) > 0:
+ command_line_options = config.formatter_options
+ else:
+ command_line_options = []
+
+ formatter_options = formatter_arg_parser.parse_args(
+ command_line_options)
+
+ # Create the TestResultsFormatter given the processed options.
+ results_formatter_object = cls(
+ results_file_object,
+ formatter_options,
+ file_is_stream)
+
+ def shutdown_formatter():
+ """Shuts down the formatter when it is no longer needed."""
+ # Tell the formatter to write out anything it may have
+ # been saving until the very end (e.g. xUnit results
+ # can't complete its output until this point).
+ results_formatter_object.send_terminate_as_needed()
+
+ # And now close out the output file-like object.
+ if cleanup_func is not None:
+ cleanup_func()
+
+ return CreatedFormatter(
+ results_formatter_object,
+ shutdown_formatter)
+ else:
+ return None
diff --git a/packages/Python/lldbsuite/test_event/formatter/curses.py b/packages/Python/lldbsuite/test_event/formatter/curses.py
new file mode 100644
index 000000000000..4e89fa9bf01e
--- /dev/null
+++ b/packages/Python/lldbsuite/test_event/formatter/curses.py
@@ -0,0 +1,229 @@
+"""
+ The LLVM Compiler Infrastructure
+
+ This file is distributed under the University of Illinois Open Source
+ License. See LICENSE.TXT for details.
+"""
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+# System modules
+import curses
+import datetime
+import math
+import sys
+import time
+
+# Third-party modules
+
+# LLDB modules
+from lldbsuite.test import lldbcurses
+
+from . import results_formatter
+from ..event_builder import EventBuilder
+
+
+class Curses(results_formatter.ResultsFormatter):
+ """Receives live results from tests that are running and reports them to the terminal in a curses GUI"""
+
+ def __init__(self, out_file, options, file_is_stream):
+ # Initialize the parent
+ super(Curses, self).__init__(out_file, options, file_is_stream)
+ self.using_terminal = True
+ self.have_curses = True
+ self.initialize_event = None
+ self.jobs = [None] * 64
+ self.job_tests = [None] * 64
+ self.results = list()
+ try:
+ self.main_window = lldbcurses.intialize_curses()
+ self.main_window.add_key_action('\t', self.main_window.select_next_first_responder, "Switch between views that can respond to keyboard input")
+ self.main_window.refresh()
+ self.job_panel = None
+ self.results_panel = None
+ self.status_panel = None
+ self.info_panel = None
+ self.hide_status_list = list()
+ self.start_time = time.time()
+ except:
+ self.have_curses = False
+ lldbcurses.terminate_curses()
+ self.using_terminal = False
+ print("Unexpected error:", sys.exc_info()[0])
+ raise
+
+ self.line_dict = dict()
+ # self.events_file = open("/tmp/events.txt", "w")
+ # self.formatters = list()
+ # if tee_results_formatter:
+ # self.formatters.append(tee_results_formatter)
+
+ def status_to_short_str(self, status, test_event):
+ if status == EventBuilder.STATUS_SUCCESS:
+ return '.'
+ elif status == EventBuilder.STATUS_FAILURE:
+ return 'F'
+ elif status == EventBuilder.STATUS_UNEXPECTED_SUCCESS:
+ return '?'
+ elif status == EventBuilder.STATUS_EXPECTED_FAILURE:
+ return 'X'
+ elif status == EventBuilder.STATUS_SKIP:
+ return 'S'
+ elif status == EventBuilder.STATUS_ERROR:
+ if test_event.get("issue_phase", None) == "build":
+ # Build failure
+ return 'B'
+ else:
+ return 'E'
+ elif status == EventBuilder.STATUS_TIMEOUT:
+ return 'T'
+ elif status == EventBuilder.STATUS_EXPECTED_TIMEOUT:
+ return 't'
+ else:
+ return status
+
+ def show_info_panel(self):
+ selected_idx = self.results_panel.get_selected_idx()
+ if selected_idx >= 0 and selected_idx < len(self.results):
+ if self.info_panel is None:
+ info_frame = self.results_panel.get_contained_rect(top_inset=10, left_inset=10, right_inset=10, height=30)
+ self.info_panel = lldbcurses.BoxedPanel(info_frame, "Result Details")
+ # Add a key action for any key that will hide this panel when any key is pressed
+ self.info_panel.add_key_action(-1, self.hide_info_panel, 'Hide the info panel')
+ self.info_panel.top()
+ else:
+ self.info_panel.show()
+
+ self.main_window.push_first_responder(self.info_panel)
+ test_start = self.results[selected_idx][0]
+ test_result = self.results[selected_idx][1]
+ self.info_panel.set_line(0, "File: %s" % (test_start['test_filename']))
+ self.info_panel.set_line(1, "Test: %s.%s" % (test_start['test_class'], test_start['test_name']))
+ self.info_panel.set_line(2, "Time: %s" % (test_result['elapsed_time']))
+ self.info_panel.set_line(3, "Status: %s" % (test_result['status']))
+
+ def hide_info_panel(self):
+ self.main_window.pop_first_responder(self.info_panel)
+ self.info_panel.hide()
+ self.main_window.refresh()
+
+ def toggle_status(self, status):
+ if status:
+ # Toggle showing and hiding results whose status matches "status" in "Results" window
+ if status in self.hide_status_list:
+ self.hide_status_list.remove(status)
+ else:
+ self.hide_status_list.append(status)
+ self.update_results()
+
+ def update_results(self, update=True):
+ '''Called after a category of test have been show/hidden to update the results list with
+ what the user desires to see.'''
+ self.results_panel.clear(update=False)
+ for result in self.results:
+ test_result = result[1]
+ status = test_result['status']
+ if status in self.hide_status_list:
+ continue
+ name = test_result['test_class'] + '.' + test_result['test_name']
+ self.results_panel.append_line('%s (%6.2f sec) %s' % (self.status_to_short_str(status, test_result), test_result['elapsed_time'], name))
+ if update:
+ self.main_window.refresh()
+
+ def handle_event(self, test_event):
+ with self.lock:
+ super(Curses, self).handle_event(test_event)
+ # for formatter in self.formatters:
+ # formatter.process_event(test_event)
+ if self.have_curses:
+ worker_index = -1
+ if 'worker_index' in test_event:
+ worker_index = test_event['worker_index']
+ if 'event' in test_event:
+ check_for_one_key = True
+ #print(str(test_event), file=self.events_file)
+ event = test_event['event']
+ if self.status_panel:
+ self.status_panel.update_status('time', str(datetime.timedelta(seconds=math.floor(time.time() - self.start_time))))
+ if event == 'test_start':
+ name = test_event['test_class'] + '.' + test_event['test_name']
+ self.job_tests[worker_index] = test_event
+ if 'pid' in test_event:
+ line = 'pid: %5d ' % (test_event['pid']) + name
+ else:
+ line = name
+ self.job_panel.set_line(worker_index, line)
+ self.main_window.refresh()
+ elif event == 'test_result':
+ status = test_event['status']
+ self.status_panel.increment_status(status)
+ if 'pid' in test_event:
+ line = 'pid: %5d ' % (test_event['pid'])
+ else:
+ line = ''
+ self.job_panel.set_line(worker_index, line)
+ name = test_event['test_class'] + '.' + test_event['test_name']
+ elapsed_time = test_event['event_time'] - self.job_tests[worker_index]['event_time']
+ if not status in self.hide_status_list:
+ self.results_panel.append_line('%s (%6.2f sec) %s' % (self.status_to_short_str(status, test_event), elapsed_time, name))
+ self.main_window.refresh()
+ # Append the result pairs
+ test_event['elapsed_time'] = elapsed_time
+ self.results.append([self.job_tests[worker_index], test_event])
+ self.job_tests[worker_index] = ''
+ elif event == 'job_begin':
+ self.jobs[worker_index] = test_event
+ if 'pid' in test_event:
+ line = 'pid: %5d ' % (test_event['pid'])
+ else:
+ line = ''
+ self.job_panel.set_line(worker_index, line)
+ elif event == 'job_end':
+ self.jobs[worker_index] = ''
+ self.job_panel.set_line(worker_index, '')
+ elif event == 'initialize':
+ self.initialize_event = test_event
+ num_jobs = test_event['worker_count']
+ job_frame = self.main_window.get_contained_rect(height=num_jobs+2)
+ results_frame = self.main_window.get_contained_rect(top_inset=num_jobs+2, bottom_inset=1)
+ status_frame = self.main_window.get_contained_rect(height=1, top_inset=self.main_window.get_size().h-1)
+ self.job_panel = lldbcurses.BoxedPanel(frame=job_frame, title="Jobs")
+ self.results_panel = lldbcurses.BoxedPanel(frame=results_frame, title="Results")
+
+ self.results_panel.add_key_action(curses.KEY_UP, self.results_panel.select_prev , "Select the previous list entry")
+ self.results_panel.add_key_action(curses.KEY_DOWN, self.results_panel.select_next , "Select the next list entry")
+ self.results_panel.add_key_action(curses.KEY_HOME, self.results_panel.scroll_begin , "Scroll to the start of the list")
+ self.results_panel.add_key_action(curses.KEY_END, self.results_panel.scroll_end , "Scroll to the end of the list")
+ self.results_panel.add_key_action(curses.KEY_ENTER, self.show_info_panel , "Display info for the selected result item")
+ self.results_panel.add_key_action('.', lambda : self.toggle_status(EventBuilder.STATUS_SUCCESS) , "Toggle showing/hiding tests whose status is 'success'")
+ self.results_panel.add_key_action('e', lambda : self.toggle_status(EventBuilder.STATUS_ERROR) , "Toggle showing/hiding tests whose status is 'error'")
+ self.results_panel.add_key_action('f', lambda : self.toggle_status(EventBuilder.STATUS_FAILURE) , "Toggle showing/hiding tests whose status is 'failure'")
+ self.results_panel.add_key_action('s', lambda : self.toggle_status(EventBuilder.STATUS_SKIP) , "Toggle showing/hiding tests whose status is 'skip'")
+ self.results_panel.add_key_action('x', lambda : self.toggle_status(EventBuilder.STATUS_EXPECTED_FAILURE) , "Toggle showing/hiding tests whose status is 'expected_failure'")
+ self.results_panel.add_key_action('?', lambda : self.toggle_status(EventBuilder.STATUS_UNEXPECTED_SUCCESS), "Toggle showing/hiding tests whose status is 'unexpected_success'")
+ self.status_panel = lldbcurses.StatusPanel(frame=status_frame)
+
+ self.main_window.add_child(self.job_panel)
+ self.main_window.add_child(self.results_panel)
+ self.main_window.add_child(self.status_panel)
+ self.main_window.set_first_responder(self.results_panel)
+
+ self.status_panel.add_status_item(name="time", title="Elapsed", format="%s", width=20, value="0:00:00", update=False)
+ self.status_panel.add_status_item(name=EventBuilder.STATUS_SUCCESS, title="Success", format="%u", width=20, value=0, update=False)
+ self.status_panel.add_status_item(name=EventBuilder.STATUS_FAILURE, title="Failure", format="%u", width=20, value=0, update=False)
+ self.status_panel.add_status_item(name=EventBuilder.STATUS_ERROR, title="Error", format="%u", width=20, value=0, update=False)
+ self.status_panel.add_status_item(name=EventBuilder.STATUS_SKIP, title="Skipped", format="%u", width=20, value=0, update=True)
+ self.status_panel.add_status_item(name=EventBuilder.STATUS_EXPECTED_FAILURE, title="Expected Failure", format="%u", width=30, value=0, update=False)
+ self.status_panel.add_status_item(name=EventBuilder.STATUS_UNEXPECTED_SUCCESS, title="Unexpected Success", format="%u", width=30, value=0, update=False)
+ self.main_window.refresh()
+ elif event == 'terminate':
+ #self.main_window.key_event_loop()
+ lldbcurses.terminate_curses()
+ check_for_one_key = False
+ self.using_terminal = False
+ # Check for 1 keypress with no delay
+
+ # Check for 1 keypress with no delay
+ if check_for_one_key:
+ self.main_window.key_event_loop(0, 1)
diff --git a/packages/Python/lldbsuite/test_event/formatter/dump_formatter.py b/packages/Python/lldbsuite/test_event/formatter/dump_formatter.py
new file mode 100644
index 000000000000..d42dcb18bb4f
--- /dev/null
+++ b/packages/Python/lldbsuite/test_event/formatter/dump_formatter.py
@@ -0,0 +1,23 @@
+"""
+ The LLVM Compiler Infrastructure
+
+This file is distributed under the University of Illinois Open Source
+License. See LICENSE.TXT for details.
+"""
+
+from __future__ import print_function
+from __future__ import absolute_import
+
+# System modules
+import pprint
+
+# Our modules
+from .results_formatter import ResultsFormatter
+
+
+class DumpFormatter(ResultsFormatter):
+ """Formats events to the file as their raw python dictionary format."""
+
+ def handle_event(self, test_event):
+ super(DumpFormatter, self).handle_event(test_event)
+ self.out_file.write("\n" + pprint.pformat(test_event) + "\n")
diff --git a/packages/Python/lldbsuite/test_event/formatter/pickled.py b/packages/Python/lldbsuite/test_event/formatter/pickled.py
new file mode 100644
index 000000000000..6d800f6c8baa
--- /dev/null
+++ b/packages/Python/lldbsuite/test_event/formatter/pickled.py
@@ -0,0 +1,72 @@
+"""
+ The LLVM Compiler Infrastructure
+
+This file is distributed under the University of Illinois Open Source
+License. See LICENSE.TXT for details.
+"""
+
+from __future__ import print_function
+from __future__ import absolute_import
+
+# System modules
+import os
+
+# Our modules
+from .results_formatter import ResultsFormatter
+from six.moves import cPickle
+
+
+class RawPickledFormatter(ResultsFormatter):
+ """Formats events as a pickled stream.
+
+ The parallel test runner has inferiors pickle their results and send them
+ over a socket back to the parallel test. The parallel test runner then
+ aggregates them into the final results formatter (e.g. xUnit).
+ """
+
+ @classmethod
+ def arg_parser(cls):
+ """@return arg parser used to parse formatter-specific options."""
+ parser = super(RawPickledFormatter, cls).arg_parser()
+ return parser
+
+ class StreamSerializer(object):
+ @staticmethod
+ def serialize(test_event, out_file):
+ # Send it as {serialized_length_of_serialized_bytes}{serialized_bytes}
+ import struct
+ msg = cPickle.dumps(test_event)
+ packet = struct.pack("!I%ds" % len(msg), len(msg), msg)
+ out_file.send(packet)
+
+ class BlockSerializer(object):
+ @staticmethod
+ def serialize(test_event, out_file):
+ cPickle.dump(test_event, out_file)
+
+ def __init__(self, out_file, options, file_is_stream):
+ super(RawPickledFormatter, self).__init__(out_file, options, file_is_stream)
+ self.pid = os.getpid()
+ if file_is_stream:
+ self.serializer = self.StreamSerializer()
+ else:
+ self.serializer = self.BlockSerializer()
+
+ def handle_event(self, test_event):
+ super(RawPickledFormatter, self).handle_event(test_event)
+
+ # Convert initialize/terminate events into job_begin/job_end events.
+ event_type = test_event["event"]
+ if event_type is None:
+ return
+
+ if event_type == "initialize":
+ test_event["event"] = "job_begin"
+ elif event_type == "terminate":
+ test_event["event"] = "job_end"
+
+ # Tack on the pid.
+ test_event["pid"] = self.pid
+
+ # Serialize the test event.
+ self.serializer.serialize(test_event, self.out_file)
diff --git a/packages/Python/lldbsuite/test_event/formatter/results_formatter.py b/packages/Python/lldbsuite/test_event/formatter/results_formatter.py
new file mode 100644
index 000000000000..3bf389b97266
--- /dev/null
+++ b/packages/Python/lldbsuite/test_event/formatter/results_formatter.py
@@ -0,0 +1,717 @@
+"""
+ The LLVM Compiler Infrastructure
+
+This file is distributed under the University of Illinois Open Source
+License. See LICENSE.TXT for details.
+
+Provides classes used by the test results reporting infrastructure
+within the LLDB test suite.
+"""
+
+from __future__ import print_function
+from __future__ import absolute_import
+
+# System modules
+import argparse
+import os
+import sys
+import threading
+
+# Third-party modules
+
+
+# LLDB modules
+from lldbsuite.test import configuration
+from ..event_builder import EventBuilder
+
+import lldbsuite
+
+
+class ResultsFormatter(object):
+ """Provides interface to formatting test results out to a file-like object.
+
+ This class allows the LLDB test framework's raw test-related
+ events to be processed and formatted in any manner desired.
+ Test events are represented by python dictionaries, formatted
+ as in the EventBuilder class above.
+
+ ResultFormatter instances are given a file-like object in which
+ to write their results.
+
+ ResultFormatter lifetime looks like the following:
+
+ # The result formatter is created.
+ # The argparse options dictionary is generated from calling
+ # the SomeResultFormatter.arg_parser() with the options data
+ # passed to dotest.py via the "--results-formatter-options"
+ # argument. See the help on that for syntactic requirements
+ # on getting that parsed correctly.
+ formatter = SomeResultFormatter(file_like_object, argparse_options_dict)
+
+ # Single call to session start, before parsing any events.
+ formatter.begin_session()
+
+ formatter.handle_event({"event":"initialize",...})
+
+ # Zero or more calls specified for events recorded during the test session.
+ # The parallel test runner manages getting results from all the inferior
+ # dotest processes, so from a new format perspective, don't worry about
+ # that. The formatter will be presented with a single stream of events
+ # sandwiched between a single begin_session()/end_session() pair in the
+ # parallel test runner process/thread.
+ for event in zero_or_more_test_events():
+ formatter.handle_event(event)
+
+ # Single call to terminate/wrap-up. For formatters that need all the
+ # data before they can print a correct result (e.g. xUnit/JUnit),
+ # this is where the final report can be generated.
+ formatter.handle_event({"event":"terminate",...})
+
+ It is not the formatter's responsibility to close the file_like_object.
+ (i.e. do not close it).
+
+ The lldb test framework passes these test events in real time, so they
+ arrive as they come in.
+
+ In the case of the parallel test runner, the dotest inferiors
+ add a 'pid' field to the dictionary that indicates which inferior
+ pid generated the event.
+
+ Note more events may be added in the future to support richer test
+ reporting functionality. One example: creating a true flaky test
+ result category so that unexpected successes really mean the test
+ is marked incorrectly (either should be marked flaky, or is indeed
+ passing consistently now and should have the xfail marker
+ removed). In this case, a flaky_success and flaky_fail event
+ likely will be added to capture these and support reporting things
+ like percentages of flaky test passing so we can see if we're
+ making some things worse/better with regards to failure rates.
+
+ Another example: announcing all the test methods that are planned
+ to be run, so we can better support redo operations of various kinds
+ (redo all non-run tests, redo non-run tests except the one that
+ was running [perhaps crashed], etc.)
+
+ Implementers are expected to override all the public methods
+ provided in this class. See each method's docstring to see
+ expectations about when the call should be chained.
+
+ """
+ @classmethod
+ def arg_parser(cls):
+ """@return arg parser used to parse formatter-specific options."""
+ parser = argparse.ArgumentParser(
+ description='{} options'.format(cls.__name__),
+ usage=('dotest.py --results-formatter-options='
+ '"--option1 value1 [--option2 value2 [...]]"'))
+ parser.add_argument(
+ "--dump-results",
+ action="store_true",
+ help=('dump the raw results data after printing '
+ 'the summary output.'))
+ return parser
+
+ def __init__(self, out_file, options, file_is_stream):
+ super(ResultsFormatter, self).__init__()
+ self.out_file = out_file
+ self.options = options
+ self.using_terminal = False
+ if not self.out_file:
+ raise Exception("ResultsFormatter created with no file object")
+ self.start_time_by_test = {}
+ self.terminate_called = False
+ self.file_is_stream = file_is_stream
+
+ # Store counts of test_result events by status.
+ self.result_status_counts = {
+ EventBuilder.STATUS_SUCCESS: 0,
+ EventBuilder.STATUS_EXPECTED_FAILURE: 0,
+ EventBuilder.STATUS_EXPECTED_TIMEOUT: 0,
+ EventBuilder.STATUS_SKIP: 0,
+ EventBuilder.STATUS_UNEXPECTED_SUCCESS: 0,
+ EventBuilder.STATUS_FAILURE: 0,
+ EventBuilder.STATUS_ERROR: 0,
+ EventBuilder.STATUS_TIMEOUT: 0,
+ EventBuilder.STATUS_EXCEPTIONAL_EXIT: 0
+ }
+
+ # Track the most recent test start event by worker index.
+ # We'll use this to assign TIMEOUT and exceptional
+ # exits to the most recent test started on a given
+ # worker index.
+ self.started_tests_by_worker = {}
+
+ # Store the most recent test_method/job status.
+ self.result_events = {}
+
+ # Track the number of test method reruns.
+ self.test_method_rerun_count = 0
+
+ # Lock that we use while mutating inner state, like the
+ # total test count and the elements. We minimize how
+ # long we hold the lock just to keep inner state safe, not
+ # entirely consistent from the outside.
+ self.lock = threading.RLock()
+
+ # Keeps track of the test base filenames for tests that
+ # are expected to timeout. If a timeout occurs in any test
+ # basename that matches this list, that result should be
+ # converted into a non-issue. We'll create an expected
+ # timeout test status for this.
+ self.expected_timeouts_by_basename = set()
+
+ # Tests which have reported that they are expecting to fail. These will
+ # be marked as expected failures even if they return a failing status,
+ # probably because they crashed or deadlocked.
+ self.expected_failures = set()
+
+ # Keep track of rerun-eligible tests.
+ # This is a set that contains tests saved as:
+ # {test_filename}:{test_class}:{test_name}
+ self.rerun_eligible_tests = set()
+
+ # A dictionary of test files that had a failing
+ # test, in the format of:
+ # key = test path, value = array of test methods that need rerun
+ self.tests_for_rerun = {}
+
+ @classmethod
+ def _make_key(cls, result_event):
+ """Creates a key from a test or job result event.
+
+ This key attempts to be as unique as possible. For
+ test result events, it will be unique per test method.
+ For job events (ones not promoted to a test result event),
+ it will be unique per test case file.
+
+ @return a string-based key of the form
+ {test_filename}:{test_class}.{test_name}
+ """
+ if result_event is None:
+ return None
+ component_count = 0
+ if "test_filename" in result_event:
+ key = result_event["test_filename"]
+ component_count += 1
+ else:
+ key = "<no_filename>"
+ if "test_class" in result_event:
+ if component_count > 0:
+ key += ":"
+ key += result_event["test_class"]
+ component_count += 1
+ if "test_name" in result_event:
+ if component_count > 0:
+ key += "."
+ key += result_event["test_name"]
+ component_count += 1
+ return key
+
+ def _mark_test_as_expected_failure(self, test_result_event):
+ key = self._make_key(test_result_event)
+ if key is not None:
+ self.expected_failures.add(key)
+ else:
+ sys.stderr.write(
+ "\nerror: test marked as expected failure but "
+ "failed to create key.\n")
+
+ def _mark_test_for_rerun_eligibility(self, test_result_event):
+ key = self._make_key(test_result_event)
+ if key is not None:
+ self.rerun_eligible_tests.add(key)
+ else:
+ sys.stderr.write(
+ "\nerror: test marked for re-run eligibility but "
+ "failed to create key.\n")
+
+ def _maybe_add_test_to_rerun_list(self, result_event):
+ key = self._make_key(result_event)
+ if key is not None:
+ if (key in self.rerun_eligible_tests or
+ configuration.rerun_all_issues):
+ test_filename = result_event.get("test_filename", None)
+ if test_filename is not None:
+ test_name = result_event.get("test_name", None)
+ if test_filename not in self.tests_for_rerun:
+ self.tests_for_rerun[test_filename] = []
+ if test_name is not None:
+ self.tests_for_rerun[test_filename].append(test_name)
+ else:
+ sys.stderr.write(
+ "\nerror: couldn't add testrun-failing test to rerun "
+ "list because no eligibility key could be created.\n")
+
+ def _maybe_remap_job_result_event(self, test_event):
+ """Remaps timeout/exceptional exit job results to last test method running.
+
+ @param test_event the job_result test event. This is an in/out
+ parameter. It will be modified if it can be mapped to a test_result
+ of the same status, using details from the last-running test method
+ known to be most recently started on the same worker index.
+ """
+ test_start = None
+
+ job_status = test_event["status"]
+ if job_status in [
+ EventBuilder.STATUS_TIMEOUT,
+ EventBuilder.STATUS_EXCEPTIONAL_EXIT]:
+ worker_index = test_event.get("worker_index", None)
+ if worker_index is not None:
+ test_start = self.started_tests_by_worker.get(
+ worker_index, None)
+
+ # If we have a test start to remap, do it here.
+ if test_start is not None:
+ test_event["event"] = EventBuilder.TYPE_TEST_RESULT
+
+ # Fill in all fields from test start not present in
+ # job status message.
+ for (start_key, start_value) in test_start.items():
+ if start_key not in test_event:
+ test_event[start_key] = start_value
+
+ def _maybe_remap_expected_timeout(self, event):
+ if event is None:
+ return
+
+ status = event.get("status", None)
+ if status is None or status != EventBuilder.STATUS_TIMEOUT:
+ return
+
+ # Check if the timeout test's basename is in the expected timeout
+ # list. If so, convert to an expected timeout.
+ basename = os.path.basename(event.get("test_filename", ""))
+ if basename in self.expected_timeouts_by_basename:
+ # Convert to an expected timeout.
+ event["status"] = EventBuilder.STATUS_EXPECTED_TIMEOUT
+
+ def _maybe_remap_expected_failure(self, event):
+ if event is None:
+ return
+
+ key = self._make_key(event)
+ if key not in self.expected_failures:
+ return
+
+ status = event.get("status", None)
+ if status in EventBuilder.TESTRUN_ERROR_STATUS_VALUES:
+ event["status"] = EventBuilder.STATUS_EXPECTED_FAILURE
+ elif status == EventBuilder.STATUS_SUCCESS:
+ event["status"] = EventBuilder.STATUS_UNEXPECTED_SUCCESS
+
+ def handle_event(self, test_event):
+ """Handles the test event for collection into the formatter output.
+
+ Derived classes may override this but should call down to this
+ implementation first.
+
+ @param test_event the test event as formatted by one of the
+ event_for_* calls.
+ """
+ with self.lock:
+ # Keep track of whether terminate was received. We do this so
+ # that a process can call the 'terminate' event on its own, to
+ # close down a formatter at the appropriate time. Then the
+ # atexit() cleanup can call the "terminate if it hasn't been
+ # called yet".
+ if test_event is not None:
+ event_type = test_event.get("event", "")
+ # We intentionally allow event_type to be checked anew
+ # after this check below since this check may rewrite
+ # the event type
+ if event_type == EventBuilder.TYPE_JOB_RESULT:
+ # Possibly convert the job status (timeout, exceptional exit)
+ # to an appropriate test_result event.
+ self._maybe_remap_job_result_event(test_event)
+ event_type = test_event.get("event", "")
+
+ # Remap timeouts to expected timeouts.
+ if event_type in EventBuilder.RESULT_TYPES:
+ self._maybe_remap_expected_timeout(test_event)
+ self._maybe_remap_expected_failure(test_event)
+ event_type = test_event.get("event", "")
+
+ if event_type == "terminate":
+ self.terminate_called = True
+ elif event_type in EventBuilder.RESULT_TYPES:
+ # Keep track of event counts per test/job result status type.
+ # The only job (i.e. inferior process) results that make it
+ # here are ones that cannot be remapped to the most recently
+ # started test for the given worker index.
+ status = test_event["status"]
+ self.result_status_counts[status] += 1
+ # Clear the most recently started test for the related worker.
+ worker_index = test_event.get("worker_index", None)
+ if worker_index is not None:
+ self.started_tests_by_worker.pop(worker_index, None)
+
+ if status in EventBuilder.TESTRUN_ERROR_STATUS_VALUES:
+ # A test/job status value in any of those status values
+ # causes a testrun failure. If such a test fails, check
+ # whether it can be rerun. If it can be rerun, add it
+ # to the rerun job.
+ self._maybe_add_test_to_rerun_list(test_event)
+
+ # Build the test key.
+ test_key = self._make_key(test_event)
+ if test_key is None:
+ raise Exception(
+ "failed to find test filename for "
+ "test event {}".format(test_event))
+
+ # Save the most recent test event for the test key.
+ # This allows a second test phase to overwrite the most
+ # recent result for the test key (unique per method).
+ # We do final reporting at the end, so we'll report based
+ # on final results.
+ # We do this so that a re-run caused by, perhaps, the need
+ # to run a low-load, single-worker test run can have the final
+ # run's results to always be used.
+ if test_key in self.result_events:
+ # We are replacing the result of something that was
+ # already counted by the base class. Remove the double
+ # counting by reducing by one the count for the test
+ # result status.
+ old_status = self.result_events[test_key]["status"]
+ self.result_status_counts[old_status] -= 1
+ self.test_method_rerun_count += 1
+ self.result_events[test_key] = test_event
+ elif event_type == EventBuilder.TYPE_TEST_START:
+ # Track the start time for the test method.
+ self.track_start_time(
+ test_event["test_class"],
+ test_event["test_name"],
+ test_event["event_time"])
+ # Track of the most recent test method start event
+ # for the related worker. This allows us to figure
+ # out whether a process timeout or exceptional exit
+ # can be charged (i.e. assigned) to a test method.
+ worker_index = test_event.get("worker_index", None)
+ if worker_index is not None:
+ self.started_tests_by_worker[worker_index] = test_event
+
+ elif event_type == EventBuilder.TYPE_MARK_TEST_RERUN_ELIGIBLE:
+ self._mark_test_for_rerun_eligibility(test_event)
+ elif event_type == EventBuilder.TYPE_MARK_TEST_EXPECTED_FAILURE:
+ self._mark_test_as_expected_failure(test_event)
+
+ def set_expected_timeouts_by_basename(self, basenames):
+ """Specifies a list of test file basenames that are allowed to timeout
+ without being called out as a timeout issue.
+
+ These fall into a new status category called STATUS_EXPECTED_TIMEOUT.
+ """
+ if basenames is not None:
+ for basename in basenames:
+ self.expected_timeouts_by_basename.add(basename)
+
+ def track_start_time(self, test_class, test_name, start_time):
+ """tracks the start time of a test so elapsed time can be computed.
+
+ this alleviates the need for test results to be processed serially
+ by test. it will save the start time for the test so that
+ elapsed_time_for_test() can compute the elapsed time properly.
+ """
+ if test_class is None or test_name is None:
+ return
+
+ test_key = "{}.{}".format(test_class, test_name)
+ self.start_time_by_test[test_key] = start_time
+
+ def elapsed_time_for_test(self, test_class, test_name, end_time):
+ """returns the elapsed time for a test.
+
+ this function can only be called once per test and requires that
+ the track_start_time() method be called sometime prior to calling
+ this method.
+ """
+ if test_class is None or test_name is None:
+ return -2.0
+
+ test_key = "{}.{}".format(test_class, test_name)
+ if test_key not in self.start_time_by_test:
+ return -1.0
+ else:
+ start_time = self.start_time_by_test[test_key]
+ del self.start_time_by_test[test_key]
+ return end_time - start_time
+
+ def is_using_terminal(self):
+ """returns true if this results formatter is using the terminal and
+ output should be avoided."""
+ return self.using_terminal
+
+ def send_terminate_as_needed(self):
+ """sends the terminate event if it hasn't been received yet."""
+ if not self.terminate_called:
+ terminate_event = EventBuilder.bare_event("terminate")
+ self.handle_event(terminate_event)
+
+ # Derived classes may require self access
+ # pylint: disable=no-self-use
+ # noinspection PyMethodMayBeStatic,PyMethodMayBeStatic
+ def replaces_summary(self):
+ """Returns whether the results formatter includes a summary
+ suitable to replace the old lldb test run results.
+
+ @return True if the lldb test runner can skip its summary
+ generation when using this results formatter; False otherwise.
+ """
+ return False
+
+ def counts_by_test_result_status(self, status):
+ """Returns number of test method results for the given status.
+
+ @status_result a test result status (e.g. success, fail, skip)
+ as defined by the EventBuilder.STATUS_* class members.
+
+ @return an integer returning the number of test methods matching
+ the given test result status.
+ """
+ return self.result_status_counts[status]
+
+ @classmethod
+ def _event_sort_key(cls, event):
+ """Returns the sort key to be used for a test event.
+
+ This method papers over the differences in a test method result vs. a
+ job (i.e. inferior process) result.
+
+ @param event a test result or job result event.
+ @return a key useful for sorting events by name (test name preferably,
+ then by test filename).
+ """
+ if "test_name" in event:
+ return event["test_name"]
+ else:
+ return event.get("test_filename", None)
+
+ def _partition_results_by_status(self, categories):
+ """Partitions the captured test results by event status.
+
+ This permits processing test results by the category ids.
+
+ @param categories the list of categories on which to partition.
+ Follows the format described in _report_category_details().
+
+ @return a dictionary where each key is the test result status,
+ and each entry is a list containing all the test result events
+ that matched that test result status. Result status IDs with
+ no matching entries will have a zero-length list.
+ """
+ partitioned_events = {}
+ for category in categories:
+ result_status_id = category[0]
+ matching_events = [
+ [key, event] for (key, event) in self.result_events.items()
+ if event.get("status", "") == result_status_id]
+ partitioned_events[result_status_id] = sorted(
+ matching_events,
+ key=lambda x: self._event_sort_key(x[1]))
+ return partitioned_events
+
+ @staticmethod
+ def _print_banner(out_file, banner_text):
+ """Prints an ASCII banner around given text.
+
+ Output goes to the out file for the results formatter.
+
+ @param out_file a file-like object where output will be written.
+ @param banner_text the text to display, with a banner
+ of '=' around the line above and line below.
+ """
+ banner_separator = "".ljust(len(banner_text), "=")
+
+ out_file.write("\n{}\n{}\n{}\n".format(
+ banner_separator,
+ banner_text,
+ banner_separator))
+
+ def _print_summary_counts(
+ self, out_file, categories, result_events_by_status, extra_rows):
+ """Prints summary counts for all categories.
+
+ @param out_file a file-like object used to print output.
+
+ @param categories the list of categories on which to partition.
+ Follows the format described in _report_category_details().
+
+ @param result_events_by_status the partitioned list of test
+ result events in a dictionary, with the key set to the test
+ result status id and the value set to the list of test method
+ results that match the status id.
+ """
+
+ # Get max length for category printed name
+ category_with_max_printed_name = max(
+ categories, key=lambda x: len(x[1]))
+ max_category_name_length = len(category_with_max_printed_name[1])
+
+ # If we are provided with extra rows, consider these row name lengths.
+ if extra_rows is not None:
+ for row in extra_rows:
+ name_length = len(row[0])
+ if name_length > max_category_name_length:
+ max_category_name_length = name_length
+
+ self._print_banner(out_file, "Test Result Summary")
+
+ # Prepend extra rows
+ if extra_rows is not None:
+ for row in extra_rows:
+ extra_label = "{}:".format(row[0]).ljust(
+ max_category_name_length + 1)
+ out_file.write("{} {:4}\n".format(extra_label, row[1]))
+
+ for category in categories:
+ result_status_id = category[0]
+ result_label = "{}:".format(category[1]).ljust(
+ max_category_name_length + 1)
+ count = len(result_events_by_status[result_status_id])
+ out_file.write("{} {:4}\n".format(
+ result_label,
+ count))
+
+ @classmethod
+ def _has_printable_details(cls, categories, result_events_by_status):
+ """Returns whether there are any test result details that need to be printed.
+
+ This will spin through the results and see if any result in a category
+ that is printable has any results to print.
+
+ @param categories the list of categories on which to partition.
+ Follows the format described in _report_category_details().
+
+ @param result_events_by_status the partitioned list of test
+ result events in a dictionary, with the key set to the test
+ result status id and the value set to the list of test method
+ results that match the status id.
+
+ @return True if there are any details (i.e. test results
+ for failures, errors, unexpected successes); False otherwise.
+ """
+ for category in categories:
+ result_status_id = category[0]
+ print_matching_tests = category[2]
+ if print_matching_tests:
+ if len(result_events_by_status[result_status_id]) > 0:
+ # We found a printable details test result status
+ # that has details to print.
+ return True
+ # We didn't find any test result category with printable
+ # details.
+ return False
+
+ @staticmethod
+ def _report_category_details(out_file, category, result_events_by_status):
+ """Reports all test results matching the given category spec.
+
+ @param out_file a file-like object used to print output.
+
+ @param category a category spec of the format [test_event_name,
+ printed_category_name, print_matching_entries?]
+
+ @param result_events_by_status the partitioned list of test
+ result events in a dictionary, with the key set to the test
+ result status id and the value set to the list of test method
+ results that match the status id.
+ """
+ result_status_id = category[0]
+ print_matching_tests = category[2]
+ detail_label = category[3]
+
+ if print_matching_tests:
+ # Sort by test name
+ for (_, event) in result_events_by_status[result_status_id]:
+ # Convert full test path into test-root-relative.
+ test_relative_path = os.path.relpath(
+ os.path.realpath(event["test_filename"]),
+ lldbsuite.lldb_test_root)
+
+ # Create extra info component (used for exceptional exit info)
+ if result_status_id == EventBuilder.STATUS_EXCEPTIONAL_EXIT:
+ extra_info = "[EXCEPTIONAL EXIT {} ({})] ".format(
+ event["exception_code"],
+ event["exception_description"])
+ else:
+ extra_info = ""
+
+ # Figure out the identity we will use for this test.
+ if configuration.verbose and ("test_class" in event):
+ test_id = "{}.{}".format(
+ event["test_class"], event["test_name"])
+ elif "test_name" in event:
+ test_id = event["test_name"]
+ else:
+ test_id = "<no_running_test_method>"
+
+ # Display the info.
+ out_file.write("{}: {}{} ({})\n".format(
+ detail_label,
+ extra_info,
+ test_id,
+ test_relative_path))
+
+ def print_results(self, out_file):
+ """Writes the test result report to the output file.
+
+ @param out_file a file-like object used for printing summary
+ results. This is different than self.out_file, which might
+ be something else for non-summary data.
+ """
+ extra_results = [
+ # Total test methods processed, excluding reruns.
+ ["Test Methods", len(self.result_events)],
+ ["Reruns", self.test_method_rerun_count]]
+
+ # Output each of the test result entries.
+ categories = [
+ # result id, printed name, print matching tests?, detail label
+ [EventBuilder.STATUS_SUCCESS,
+ "Success", False, None],
+ [EventBuilder.STATUS_EXPECTED_FAILURE,
+ "Expected Failure", False, None],
+ [EventBuilder.STATUS_FAILURE,
+ "Failure", True, "FAIL"],
+ [EventBuilder.STATUS_ERROR,
+ "Error", True, "ERROR"],
+ [EventBuilder.STATUS_EXCEPTIONAL_EXIT,
+ "Exceptional Exit", True, "ERROR"],
+ [EventBuilder.STATUS_UNEXPECTED_SUCCESS,
+ "Unexpected Success", True, "UNEXPECTED SUCCESS"],
+ [EventBuilder.STATUS_SKIP, "Skip", False, None],
+ [EventBuilder.STATUS_TIMEOUT,
+ "Timeout", True, "TIMEOUT"],
+ [EventBuilder.STATUS_EXPECTED_TIMEOUT,
+ # Intentionally using the unusual hyphenation in TIME-OUT to
+ # prevent buildbots from thinking it is an issue when scanning
+ # for TIMEOUT.
+ "Expected Timeout", True, "EXPECTED TIME-OUT"]
+ ]
+
+ # Partition all the events by test result status
+ result_events_by_status = self._partition_results_by_status(
+ categories)
+
+ # Print the details
+ have_details = self._has_printable_details(
+ categories, result_events_by_status)
+ if have_details:
+ self._print_banner(out_file, "Issue Details")
+ for category in categories:
+ self._report_category_details(
+ out_file, category, result_events_by_status)
+
+ # Print the summary
+ self._print_summary_counts(
+ out_file, categories, result_events_by_status, extra_results)
+
+ if self.options.dump_results:
+ # Debug dump of the key/result info for all categories.
+ self._print_banner(out_file, "Results Dump")
+ for status, events_by_key in result_events_by_status.items():
+ out_file.write("\nSTATUS: {}\n".format(status))
+ for key, event in events_by_key:
+ out_file.write("key: {}\n".format(key))
+ out_file.write("event: {}\n".format(event))
diff --git a/packages/Python/lldbsuite/test_event/formatter/xunit.py b/packages/Python/lldbsuite/test_event/formatter/xunit.py
new file mode 100644
index 000000000000..b3682f8fb107
--- /dev/null
+++ b/packages/Python/lldbsuite/test_event/formatter/xunit.py
@@ -0,0 +1,592 @@
+"""
+ The LLVM Compiler Infrastructure
+
+This file is distributed under the University of Illinois Open Source
+License. See LICENSE.TXT for details.
+
+Provides an xUnit ResultsFormatter for integrating the LLDB
+test suite with the Jenkins xUnit aggregator and other xUnit-compliant
+test output processors.
+"""
+from __future__ import absolute_import
+from __future__ import print_function
+
+# System modules
+import re
+import sys
+import xml.sax.saxutils
+
+# Third-party modules
+import six
+
+# Local modules
+from ..event_builder import EventBuilder
+from ..build_exception import BuildError
+from .results_formatter import ResultsFormatter
+
+
+class XunitFormatter(ResultsFormatter):
+ """Provides xUnit-style formatted output.
+ """
+
+ # Result mapping arguments
+ RM_IGNORE = 'ignore'
+ RM_SUCCESS = 'success'
+ RM_FAILURE = 'failure'
+ RM_PASSTHRU = 'passthru'
+
+ @staticmethod
+ def _build_illegal_xml_regex():
+ """Constructs a regex to match all illegal xml characters.
+
+ Expects to be used against a unicode string."""
+ # Construct the range pairs of invalid unicode characters.
+ illegal_chars_u = [
+ (0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), (0x7F, 0x84),
+ (0x86, 0x9F), (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF)]
+
+ # For wide builds, we have more.
+ if sys.maxunicode >= 0x10000:
+ illegal_chars_u.extend(
+ [(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF), (0x3FFFE, 0x3FFFF),
+ (0x4FFFE, 0x4FFFF), (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF),
+ (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF), (0x9FFFE, 0x9FFFF),
+ (0xAFFFE, 0xAFFFF), (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF),
+ (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF), (0xFFFFE, 0xFFFFF),
+ (0x10FFFE, 0x10FFFF)])
+
+ # Build up an array of range expressions.
+ illegal_ranges = [
+ "%s-%s" % (six.unichr(low), six.unichr(high))
+ for (low, high) in illegal_chars_u]
+
+ # Compile the regex
+ return re.compile(six.u('[%s]') % six.u('').join(illegal_ranges))
+
+ @staticmethod
+ def _quote_attribute(text):
+ """Returns the given text in a manner safe for usage in an XML attribute.
+
+ @param text the text that should appear within an XML attribute.
+ @return the attribute-escaped version of the input text.
+ """
+ return xml.sax.saxutils.quoteattr(text)
+
+ def _replace_invalid_xml(self, str_or_unicode):
+ """Replaces invalid XML characters with a '?'.
+
+ @param str_or_unicode a string to replace invalid XML
+ characters within. Can be unicode or not. If not unicode,
+ assumes it is a byte string in utf-8 encoding.
+
+ @returns a utf-8-encoded byte string with invalid
+ XML replaced with '?'.
+ """
+ # Get the content into unicode
+ if isinstance(str_or_unicode, str):
+ unicode_content = str_or_unicode.decode('utf-8')
+ else:
+ unicode_content = str_or_unicode
+ return self.invalid_xml_re.sub(
+ six.u('?'), unicode_content).encode('utf-8')
+
+ @classmethod
+ def arg_parser(cls):
+ """@return arg parser used to parse formatter-specific options."""
+ parser = super(XunitFormatter, cls).arg_parser()
+
+ # These are valid choices for results mapping.
+ results_mapping_choices = [
+ XunitFormatter.RM_IGNORE,
+ XunitFormatter.RM_SUCCESS,
+ XunitFormatter.RM_FAILURE,
+ XunitFormatter.RM_PASSTHRU]
+ parser.add_argument(
+ "--assert-on-unknown-events",
+ action="store_true",
+ help=('cause unknown test events to generate '
+ 'a python assert. Default is to ignore.'))
+ parser.add_argument(
+ "--ignore-skip-name",
+ "-n",
+ metavar='PATTERN',
+ action="append",
+ dest='ignore_skip_name_patterns',
+ help=('a python regex pattern, where '
+ 'any skipped test with a test method name where regex '
+ 'matches (via search) will be ignored for xUnit test '
+ 'result purposes. Can be specified multiple times.'))
+ parser.add_argument(
+ "--ignore-skip-reason",
+ "-r",
+ metavar='PATTERN',
+ action="append",
+ dest='ignore_skip_reason_patterns',
+ help=('a python regex pattern, where '
+ 'any skipped test with a skip reason where the regex '
+ 'matches (via search) will be ignored for xUnit test '
+ 'result purposes. Can be specified multiple times.'))
+ parser.add_argument(
+ "--xpass", action="store", choices=results_mapping_choices,
+ default=XunitFormatter.RM_FAILURE,
+ help=('specify mapping from unexpected success to jUnit/xUnit '
+ 'result type'))
+ parser.add_argument(
+ "--xfail", action="store", choices=results_mapping_choices,
+ default=XunitFormatter.RM_IGNORE,
+ help=('specify mapping from expected failure to jUnit/xUnit '
+ 'result type'))
+ return parser
+
+ @staticmethod
+ def _build_regex_list_from_patterns(patterns):
+ """Builds a list of compiled regular expressions from option value.
+
+ @param patterns contains a list of regular expression
+ patterns.
+
+ @return list of compiled regular expressions, empty if no
+ patterns provided.
+ """
+ regex_list = []
+ if patterns is not None:
+ for pattern in patterns:
+ regex_list.append(re.compile(pattern))
+ return regex_list
+
+ def __init__(self, out_file, options, file_is_stream):
+ """Initializes the XunitFormatter instance.
+ @param out_file file-like object where formatted output is written.
+ @param options specifies a dictionary of options for the
+ formatter.
+ """
+ # Initialize the parent
+ super(XunitFormatter, self).__init__(out_file, options, file_is_stream)
+ self.text_encoding = "UTF-8"
+ self.invalid_xml_re = XunitFormatter._build_illegal_xml_regex()
+ self.total_test_count = 0
+ self.ignore_skip_name_regexes = (
+ XunitFormatter._build_regex_list_from_patterns(
+ options.ignore_skip_name_patterns))
+ self.ignore_skip_reason_regexes = (
+ XunitFormatter._build_regex_list_from_patterns(
+ options.ignore_skip_reason_patterns))
+
+ self.elements = {
+ "successes": [],
+ "errors": [],
+ "failures": [],
+ "skips": [],
+ "unexpected_successes": [],
+ "expected_failures": [],
+ "all": []
+ }
+
+ self.status_handlers = {
+ EventBuilder.STATUS_SUCCESS: self._handle_success,
+ EventBuilder.STATUS_FAILURE: self._handle_failure,
+ EventBuilder.STATUS_ERROR: self._handle_error,
+ EventBuilder.STATUS_SKIP: self._handle_skip,
+ EventBuilder.STATUS_EXPECTED_FAILURE:
+ self._handle_expected_failure,
+ EventBuilder.STATUS_EXPECTED_TIMEOUT:
+ self._handle_expected_timeout,
+ EventBuilder.STATUS_UNEXPECTED_SUCCESS:
+ self._handle_unexpected_success,
+ EventBuilder.STATUS_EXCEPTIONAL_EXIT:
+ self._handle_exceptional_exit,
+ EventBuilder.STATUS_TIMEOUT:
+ self._handle_timeout
+ }
+
+ RESULT_TYPES = {EventBuilder.TYPE_TEST_RESULT, EventBuilder.TYPE_JOB_RESULT}
+
+ def handle_event(self, test_event):
+ super(XunitFormatter, self).handle_event(test_event)
+
+ event_type = test_event["event"]
+ if event_type is None:
+ return
+
+ if event_type == "terminate":
+ # Process all the final result events into their
+ # XML counterparts.
+ for result_event in self.result_events.values():
+ self._process_test_result(result_event)
+ self._finish_output()
+ else:
+ # This is an unknown event.
+ if self.options.assert_on_unknown_events:
+ raise Exception("unknown event type {} from {}\n".format(
+ event_type, test_event))
+
+ def _handle_success(self, test_event):
+ """Handles a test success.
+ @param test_event the test event to handle.
+ """
+ result = self._common_add_testcase_entry(test_event)
+ with self.lock:
+ self.elements["successes"].append(result)
+
+ def _handle_failure(self, test_event):
+ """Handles a test failure.
+ @param test_event the test event to handle.
+ """
+ message = self._replace_invalid_xml(test_event["issue_message"])
+ backtrace = self._replace_invalid_xml(
+ "".join(test_event.get("issue_backtrace", [])))
+
+ result = self._common_add_testcase_entry(
+ test_event,
+ inner_content=(
+ '<failure type={} message={}><![CDATA[{}]]></failure>'.format(
+ XunitFormatter._quote_attribute(test_event["issue_class"]),
+ XunitFormatter._quote_attribute(message),
+ backtrace)
+ ))
+ with self.lock:
+ self.elements["failures"].append(result)
+
+ def _handle_error_build(self, test_event):
+ """Handles a test error.
+ @param test_event the test event to handle.
+ """
+ message = self._replace_invalid_xml(test_event["issue_message"])
+ build_issue_description = self._replace_invalid_xml(
+ BuildError.format_build_error(
+ test_event.get("build_command", "<None>"),
+ test_event.get("build_error", "<None>")))
+
+ result = self._common_add_testcase_entry(
+ test_event,
+ inner_content=(
+ '<error type={} message={}><![CDATA[{}]]></error>'.format(
+ XunitFormatter._quote_attribute(test_event["issue_class"]),
+ XunitFormatter._quote_attribute(message),
+ build_issue_description)
+ ))
+ with self.lock:
+ self.elements["errors"].append(result)
+
+ def _handle_error_standard(self, test_event):
+ """Handles a test error.
+ @param test_event the test event to handle.
+ """
+ message = self._replace_invalid_xml(test_event["issue_message"])
+ backtrace = self._replace_invalid_xml(
+ "".join(test_event.get("issue_backtrace", [])))
+
+ result = self._common_add_testcase_entry(
+ test_event,
+ inner_content=(
+ '<error type={} message={}><![CDATA[{}]]></error>'.format(
+ XunitFormatter._quote_attribute(test_event["issue_class"]),
+ XunitFormatter._quote_attribute(message),
+ backtrace)
+ ))
+ with self.lock:
+ self.elements["errors"].append(result)
+
+ def _handle_error(self, test_event):
+ if test_event.get("issue_phase", None) == "build":
+ self._handle_error_build(test_event)
+ else:
+ self._handle_error_standard(test_event)
+
+ def _handle_exceptional_exit(self, test_event):
+ """Handles an exceptional exit.
+ @param test_event the test method or job result event to handle.
+ """
+ if "test_name" in test_event:
+ name = test_event["test_name"]
+ else:
+ name = test_event.get("test_filename", "<unknown test/filename>")
+
+ message_text = "ERROR: {} ({}): {}".format(
+ test_event.get("exception_code", 0),
+ test_event.get("exception_description", ""),
+ name)
+ message = self._replace_invalid_xml(message_text)
+
+ result = self._common_add_testcase_entry(
+ test_event,
+ inner_content=(
+ '<error type={} message={}></error>'.format(
+ "exceptional_exit",
+ XunitFormatter._quote_attribute(message))
+ ))
+ with self.lock:
+ self.elements["errors"].append(result)
+
+ def _handle_timeout(self, test_event):
+ """Handles a test method or job timeout.
+ @param test_event the test method or job result event to handle.
+ """
+ if "test_name" in test_event:
+ name = test_event["test_name"]
+ else:
+ name = test_event.get("test_filename", "<unknown test/filename>")
+
+ message_text = "TIMEOUT: {}".format(name)
+ message = self._replace_invalid_xml(message_text)
+
+ result = self._common_add_testcase_entry(
+ test_event,
+ inner_content=(
+ '<error type={} message={}></error>'.format(
+ "timeout",
+ XunitFormatter._quote_attribute(message))
+ ))
+ with self.lock:
+ self.elements["errors"].append(result)
+
+ @staticmethod
+ def _ignore_based_on_regex_list(test_event, test_key, regex_list):
+ """Returns whether to ignore a test event based on patterns.
+
+ @param test_event the test event dictionary to check.
+ @param test_key the key within the dictionary to check.
+ @param regex_list a list of zero or more regexes. May contain
+ zero or more compiled regexes.
+
+ @return True if any o the regex list match based on the
+ re.search() method; false otherwise.
+ """
+ for regex in regex_list:
+ match = regex.search(test_event.get(test_key, ''))
+ if match:
+ return True
+ return False
+
+ def _handle_skip(self, test_event):
+ """Handles a skipped test.
+ @param test_event the test event to handle.
+ """
+
+ # Are we ignoring this test based on test name?
+ if XunitFormatter._ignore_based_on_regex_list(
+ test_event, 'test_name', self.ignore_skip_name_regexes):
+ return
+
+ # Are we ignoring this test based on skip reason?
+ if XunitFormatter._ignore_based_on_regex_list(
+ test_event, 'skip_reason', self.ignore_skip_reason_regexes):
+ return
+
+ # We're not ignoring this test. Process the skip.
+ reason = self._replace_invalid_xml(test_event.get("skip_reason", ""))
+ result = self._common_add_testcase_entry(
+ test_event,
+ inner_content='<skipped message={} />'.format(
+ XunitFormatter._quote_attribute(reason)))
+ with self.lock:
+ self.elements["skips"].append(result)
+
+ def _handle_expected_failure(self, test_event):
+ """Handles a test that failed as expected.
+ @param test_event the test event to handle.
+ """
+ if self.options.xfail == XunitFormatter.RM_PASSTHRU:
+ # This is not a natively-supported junit/xunit
+ # testcase mode, so it might fail a validating
+ # test results viewer.
+ if "bugnumber" in test_event:
+ bug_id_attribute = 'bug-id={} '.format(
+ XunitFormatter._quote_attribute(test_event["bugnumber"]))
+ else:
+ bug_id_attribute = ''
+
+ result = self._common_add_testcase_entry(
+ test_event,
+ inner_content=(
+ '<expected-failure {}type={} message={} />'.format(
+ bug_id_attribute,
+ XunitFormatter._quote_attribute(
+ test_event["issue_class"]),
+ XunitFormatter._quote_attribute(
+ test_event["issue_message"]))
+ ))
+ with self.lock:
+ self.elements["expected_failures"].append(result)
+ elif self.options.xfail == XunitFormatter.RM_SUCCESS:
+ result = self._common_add_testcase_entry(test_event)
+ with self.lock:
+ self.elements["successes"].append(result)
+ elif self.options.xfail == XunitFormatter.RM_FAILURE:
+ result = self._common_add_testcase_entry(
+ test_event,
+ inner_content='<failure type={} message={} />'.format(
+ XunitFormatter._quote_attribute(test_event["issue_class"]),
+ XunitFormatter._quote_attribute(
+ test_event["issue_message"])))
+ with self.lock:
+ self.elements["failures"].append(result)
+ elif self.options.xfail == XunitFormatter.RM_IGNORE:
+ pass
+ else:
+ raise Exception(
+ "unknown xfail option: {}".format(self.options.xfail))
+
+ @staticmethod
+ def _handle_expected_timeout(test_event):
+ """Handles expected_timeout.
+ @param test_event the test event to handle.
+ """
+ # We don't do anything with expected timeouts, not even report.
+ pass
+
+ def _handle_unexpected_success(self, test_event):
+ """Handles a test that passed but was expected to fail.
+ @param test_event the test event to handle.
+ """
+ if self.options.xpass == XunitFormatter.RM_PASSTHRU:
+ # This is not a natively-supported junit/xunit
+ # testcase mode, so it might fail a validating
+ # test results viewer.
+ result = self._common_add_testcase_entry(
+ test_event,
+ inner_content="<unexpected-success />")
+ with self.lock:
+ self.elements["unexpected_successes"].append(result)
+ elif self.options.xpass == XunitFormatter.RM_SUCCESS:
+ # Treat the xpass as a success.
+ result = self._common_add_testcase_entry(test_event)
+ with self.lock:
+ self.elements["successes"].append(result)
+ elif self.options.xpass == XunitFormatter.RM_FAILURE:
+ # Treat the xpass as a failure.
+ if "bugnumber" in test_event:
+ message = "unexpected success (bug_id:{})".format(
+ test_event["bugnumber"])
+ else:
+ message = "unexpected success (bug_id:none)"
+ result = self._common_add_testcase_entry(
+ test_event,
+ inner_content='<failure type={} message={} />'.format(
+ XunitFormatter._quote_attribute("unexpected_success"),
+ XunitFormatter._quote_attribute(message)))
+ with self.lock:
+ self.elements["failures"].append(result)
+ elif self.options.xpass == XunitFormatter.RM_IGNORE:
+ # Ignore the xpass result as far as xUnit reporting goes.
+ pass
+ else:
+ raise Exception("unknown xpass option: {}".format(
+ self.options.xpass))
+
+ def _process_test_result(self, test_event):
+ """Processes the test_event known to be a test result.
+
+ This categorizes the event appropriately and stores the data needed
+ to generate the final xUnit report. This method skips events that
+ cannot be represented in xUnit output.
+ """
+ if "status" not in test_event:
+ raise Exception("test event dictionary missing 'status' key")
+
+ status = test_event["status"]
+ if status not in self.status_handlers:
+ raise Exception("test event status '{}' unsupported".format(
+ status))
+
+ # Call the status handler for the test result.
+ self.status_handlers[status](test_event)
+
+ def _common_add_testcase_entry(self, test_event, inner_content=None):
+ """Registers a testcase result, and returns the text created.
+
+ The caller is expected to manage failure/skip/success counts
+ in some kind of appropriate way. This call simply constructs
+ the XML and appends the returned result to the self.all_results
+ list.
+
+ @param test_event the test event dictionary.
+
+ @param inner_content if specified, gets included in the <testcase>
+ inner section, at the point before stdout and stderr would be
+ included. This is where a <failure/>, <skipped/>, <error/>, etc.
+ could go.
+
+ @return the text of the xml testcase element.
+ """
+
+ # Get elapsed time.
+ test_class = test_event.get("test_class", "<no_class>")
+ test_name = test_event.get("test_name", "<no_test_method>")
+ event_time = test_event["event_time"]
+ time_taken = self.elapsed_time_for_test(
+ test_class, test_name, event_time)
+
+ # Plumb in stdout/stderr once we shift over to only test results.
+ test_stdout = ''
+ test_stderr = ''
+
+ # Formulate the output xml.
+ if not inner_content:
+ inner_content = ""
+ result = (
+ '<testcase classname="{}" name="{}" time="{:.3f}">'
+ '{}{}{}</testcase>'.format(
+ test_class,
+ test_name,
+ time_taken,
+ inner_content,
+ test_stdout,
+ test_stderr))
+
+ # Save the result, update total test count.
+ with self.lock:
+ self.total_test_count += 1
+ self.elements["all"].append(result)
+
+ return result
+
+ def _finish_output_no_lock(self):
+ """Flushes out the report of test executions to form valid xml output.
+
+ xUnit output is in XML. The reporting system cannot complete the
+ formatting of the output without knowing when there is no more input.
+ This call addresses notification of the completed test run and thus is
+ when we can finish off the report output.
+ """
+
+ # Figure out the counts line for the testsuite. If we have
+ # been counting either unexpected successes or expected
+ # failures, we'll output those in the counts, at the risk of
+ # being invalidated by a validating test results viewer.
+ # These aren't counted by default so they won't show up unless
+ # the user specified a formatter option to include them.
+ xfail_count = len(self.elements["expected_failures"])
+ xpass_count = len(self.elements["unexpected_successes"])
+ if xfail_count > 0 or xpass_count > 0:
+ extra_testsuite_attributes = (
+ ' expected-failures="{}"'
+ ' unexpected-successes="{}"'.format(xfail_count, xpass_count))
+ else:
+ extra_testsuite_attributes = ""
+
+ # Output the header.
+ self.out_file.write(
+ '<?xml version="1.0" encoding="{}"?>\n'
+ '<testsuites>'
+ '<testsuite name="{}" tests="{}" errors="{}" failures="{}" '
+ 'skip="{}"{}>\n'.format(
+ self.text_encoding,
+ "LLDB test suite",
+ self.total_test_count,
+ len(self.elements["errors"]),
+ len(self.elements["failures"]),
+ len(self.elements["skips"]),
+ extra_testsuite_attributes))
+
+ # Output each of the test result entries.
+ for result in self.elements["all"]:
+ self.out_file.write(result + '\n')
+
+ # Close off the test suite.
+ self.out_file.write('</testsuite></testsuites>\n')
+
+ def _finish_output(self):
+ """Finish writing output as all incoming events have arrived."""
+ with self.lock:
+ self._finish_output_no_lock()
diff --git a/packages/Python/lldbsuite/test_event/test/resources/invalid_decorator/TestInvalidDecorator.py b/packages/Python/lldbsuite/test_event/test/resources/invalid_decorator/TestInvalidDecorator.py
new file mode 100644
index 000000000000..7f5c4cb79cf5
--- /dev/null
+++ b/packages/Python/lldbsuite/test_event/test/resources/invalid_decorator/TestInvalidDecorator.py
@@ -0,0 +1,13 @@
+from __future__ import print_function
+from lldbsuite.test import lldbtest
+from lldbsuite.test import decorators
+
+
+class NonExistentDecoratorTestCase(lldbtest.TestBase):
+
+ mydir = lldbtest.TestBase.compute_mydir(__file__)
+
+ @decorators.nonExistentDecorator(bugnumber="yt/1300")
+ def test(self):
+ """Verify non-existent decorators are picked up by test runner."""
+ pass
diff --git a/packages/Python/lldbsuite/test_event/test/src/TestCatchInvalidDecorator.py b/packages/Python/lldbsuite/test_event/test/src/TestCatchInvalidDecorator.py
new file mode 100644
index 000000000000..56814416c333
--- /dev/null
+++ b/packages/Python/lldbsuite/test_event/test/src/TestCatchInvalidDecorator.py
@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+"""
+Tests that the event system reports issues during decorator
+handling as errors.
+"""
+# System-provided imports
+import os
+import unittest
+
+# Local-provided imports
+import event_collector
+
+
+class TestCatchInvalidDecorator(unittest.TestCase):
+
+ TEST_DIR = os.path.join(
+ os.path.dirname(__file__),
+ os.path.pardir,
+ "resources",
+ "invalid_decorator")
+
+ def test_with_whole_file(self):
+ """
+ Test that a non-existent decorator generates a test-event error
+ when running all tests in the file.
+ """
+ # Determine the test case file we're using.
+ test_file = os.path.join(self.TEST_DIR, "TestInvalidDecorator.py")
+
+ # Collect all test events generated for this file.
+ error_results = _filter_error_results(
+ event_collector.collect_events_whole_file(test_file))
+
+ self.assertGreater(
+ len(error_results),
+ 0,
+ "At least one job or test error result should have been returned")
+
+ def test_with_function_filter(self):
+ """
+ Test that a non-existent decorator generates a test-event error
+ when running a filtered test.
+ """
+ # Collect all test events generated during running of tests
+ # in a given directory using a test name filter. Internally,
+ # this runs through a different code path that needs to be
+ # set up to catch exceptions.
+ error_results = _filter_error_results(
+ event_collector.collect_events_for_directory_with_filter(
+ self.TEST_DIR,
+ "NonExistentDecoratorTestCase.test"))
+
+ self.assertGreater(
+ len(error_results),
+ 0,
+ "At least one job or test error result should have been returned")
+
+
+def _filter_error_results(events):
+ # Filter out job result events.
+ return [
+ event
+ for event in events
+ if event.get("event", None) in ["job_result", "test_result"] and
+ event.get("status", None) == "error"
+ ]
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/packages/Python/lldbsuite/test_event/test/src/event_collector.py b/packages/Python/lldbsuite/test_event/test/src/event_collector.py
new file mode 100644
index 000000000000..35d132066898
--- /dev/null
+++ b/packages/Python/lldbsuite/test_event/test/src/event_collector.py
@@ -0,0 +1,85 @@
+from __future__ import absolute_import
+from __future__ import print_function
+
+import os
+import subprocess
+import sys
+import tempfile
+
+# noinspection PyUnresolvedReferences
+from six.moves import cPickle
+
+
+def path_to_dotest_py():
+ return os.path.join(
+ os.path.dirname(__file__),
+ os.path.pardir,
+ os.path.pardir,
+ os.path.pardir,
+ os.path.pardir,
+ os.path.pardir,
+ os.path.pardir,
+ "test",
+ "dotest.py")
+
+
+def _make_pickled_events_filename():
+ with tempfile.NamedTemporaryFile(
+ prefix="lldb_test_event_pickled_event_output",
+ delete=False) as temp_file:
+ return temp_file.name
+
+
+def _collect_events_with_command(command, events_filename):
+ # Run the single test with dotest.py, outputting
+ # the raw pickled events to a temp file.
+ with open(os.devnull, 'w') as dev_null_file:
+ subprocess.call(
+ command,
+ stdout=dev_null_file,
+ stderr=dev_null_file)
+
+ # Unpickle the events
+ events = []
+ if os.path.exists(events_filename):
+ with open(events_filename, "rb") as events_file:
+ while True:
+ try:
+ # print("reading event")
+ event = cPickle.load(events_file)
+ # print("read event: {}".format(event))
+ if event:
+ events.append(event)
+ except EOFError:
+ # This is okay.
+ break
+ os.remove(events_filename)
+ return events
+
+
+def collect_events_whole_file(test_filename):
+ events_filename = _make_pickled_events_filename()
+ command = [
+ sys.executable,
+ path_to_dotest_py(),
+ "--inferior",
+ "--results-formatter=lldbsuite.test_event.formatter.pickled.RawPickledFormatter",
+ "--results-file={}".format(events_filename),
+ "-p", os.path.basename(test_filename),
+ os.path.dirname(test_filename)
+ ]
+ return _collect_events_with_command(command, events_filename)
+
+
+def collect_events_for_directory_with_filter(test_filename, filter_desc):
+ events_filename = _make_pickled_events_filename()
+ command = [
+ sys.executable,
+ path_to_dotest_py(),
+ "--inferior",
+ "--results-formatter=lldbsuite.test_event.formatter.pickled.RawPickledFormatter",
+ "--results-file={}".format(events_filename),
+ "-f", filter_desc,
+ os.path.dirname(test_filename)
+ ]
+ return _collect_events_with_command(command, events_filename)