test events: announce worker count in new initialize event

See the following for details:
http://reviews.llvm.org/D12987

llvm-svn: 248059
This commit is contained in:
Todd Fiala 2015-09-18 22:45:31 +00:00
parent e629a45531
commit e83f140833
4 changed files with 103 additions and 109 deletions

View File

@ -180,9 +180,14 @@ def call_with_timeout(command, timeout, name, inferior_pid_events):
command = [timeout_command, '-s', 'QUIT', timeout] + command
if GET_WORKER_INDEX is not None:
try:
worker_index = GET_WORKER_INDEX()
command.extend([
"--event-add-entries", "worker_index={}".format(worker_index)])
except:
# Ctrl-C does bad things to multiprocessing.Manager.dict() lookup.
pass
# Specifying a value for close_fds is unsupported on Windows when using
# subprocess.PIPE
if os.name != "nt":
@ -896,7 +901,7 @@ def walk_and_invoke(test_directory, test_subdir, dotest_argv,
# listener channel and tell the inferior to send results to the
# port on which we'll be listening.
if RESULTS_FORMATTER is not None:
forwarding_func = RESULTS_FORMATTER.process_event
forwarding_func = RESULTS_FORMATTER.handle_event
RESULTS_LISTENER_CHANNEL = (
dotest_channels.UnpicklingForwardingListenerChannel(
RUNNER_PROCESS_ASYNC_MAP, "localhost", 0, forwarding_func))
@ -1184,15 +1189,6 @@ def main(print_details_on_success, num_threads, test_subdir,
for core in cores:
os.unlink(core)
if not num_threads:
num_threads_str = os.environ.get("LLDB_TEST_THREADS")
if num_threads_str:
num_threads = int(num_threads_str)
else:
num_threads = multiprocessing.cpu_count()
if num_threads < 1:
num_threads = 1
system_info = " ".join(platform.uname())
# Figure out which testrunner strategy we'll use.

View File

@ -1006,13 +1006,26 @@ def setupTestResults():
# Start the results formatter session - we'll only have one
# during a given dotest process invocation.
results_formatter_object.begin_session()
initialize_event = EventBuilder.bare_event("initialize")
if isMultiprocessTestRunner():
if test_runner_name is not None and test_runner_name == "serial":
# Only one worker queue here.
worker_count = 1
else:
# Workers will be the number of threads specified.
worker_count = num_threads
else:
worker_count = 1
initialize_event["worker_count"] = worker_count
results_formatter_object.handle_event(initialize_event)
def shutdown_formatter():
# Tell the formatter to write out anything it may have
# been saving until the very end (e.g. xUnit results
# can't complete its output until this point).
results_formatter_object.end_session()
terminate_event = EventBuilder.bare_event("terminate")
results_formatter_object.handle_event(terminate_event)
# And now close out the output file-like object.
if cleanup_func is not None:
@ -1873,7 +1886,7 @@ if __name__ == "__main__":
self.stream.write(self.fmt % self.counter)
super(LLDBTestResult, self).startTest(test)
if self.results_formatter:
self.results_formatter.process_event(
self.results_formatter.handle_event(
EventBuilder.event_for_start(test))
def addSuccess(self, test):
@ -1882,7 +1895,7 @@ if __name__ == "__main__":
if parsable:
self.stream.write("PASS: LLDB (%s) :: %s\n" % (self._config_string(test), str(test)))
if self.results_formatter:
self.results_formatter.process_event(
self.results_formatter.handle_event(
EventBuilder.event_for_success(test))
def addError(self, test, err):
@ -1896,7 +1909,7 @@ if __name__ == "__main__":
if parsable:
self.stream.write("FAIL: LLDB (%s) :: %s\n" % (self._config_string(test), str(test)))
if self.results_formatter:
self.results_formatter.process_event(
self.results_formatter.handle_event(
EventBuilder.event_for_error(test, err))
def addCleanupError(self, test, err):
@ -1910,7 +1923,7 @@ if __name__ == "__main__":
if parsable:
self.stream.write("CLEANUP ERROR: LLDB (%s) :: %s\n" % (self._config_string(test), str(test)))
if self.results_formatter:
self.results_formatter.process_event(
self.results_formatter.handle_event(
EventBuilder.event_for_cleanup_error(
test, err))
@ -1933,7 +1946,7 @@ if __name__ == "__main__":
else:
failuresPerCategory[category] = 1
if self.results_formatter:
self.results_formatter.process_event(
self.results_formatter.handle_event(
EventBuilder.event_for_failure(test, err))
@ -1948,7 +1961,7 @@ if __name__ == "__main__":
if parsable:
self.stream.write("XFAIL: LLDB (%s) :: %s\n" % (self._config_string(test), str(test)))
if self.results_formatter:
self.results_formatter.process_event(
self.results_formatter.handle_event(
EventBuilder.event_for_expected_failure(
test, err, bugnumber))
@ -1963,7 +1976,7 @@ if __name__ == "__main__":
if parsable:
self.stream.write("UNSUPPORTED: LLDB (%s) :: %s (%s) \n" % (self._config_string(test), str(test), reason))
if self.results_formatter:
self.results_formatter.process_event(
self.results_formatter.handle_event(
EventBuilder.event_for_skip(test, reason))
def addUnexpectedSuccess(self, test, bugnumber):
@ -1977,7 +1990,7 @@ if __name__ == "__main__":
if parsable:
self.stream.write("XPASS: LLDB (%s) :: %s\n" % (self._config_string(test), str(test)))
if self.results_formatter:
self.results_formatter.process_event(
self.results_formatter.handle_event(
EventBuilder.event_for_unexpected_success(
test, bugnumber))

View File

@ -1,4 +1,5 @@
import sys
import multiprocessing
import os
import textwrap
@ -26,6 +27,16 @@ def parse_args(parser, argv):
else:
return parser.parse_args(args=argv)
def default_thread_count():
# Check if specified in the environment
num_threads_str = os.environ.get("LLDB_TEST_THREADS")
if num_threads_str:
return int(num_threads_str)
else:
return multiprocessing.cpu_count()
def create_parser():
parser = argparse.ArgumentParser(description='description', prefix_chars='+-', add_help=False)
group = None
@ -126,6 +137,7 @@ def create_parser():
'--threads',
type=int,
dest='num_threads',
default=default_thread_count(),
help=('The number of threads/processes to use when running tests '
'separately, defaults to the number of CPU cores available'))
group.add_argument(

View File

@ -39,6 +39,29 @@ class EventBuilder(object):
test_name = test_class_components[-1]
return (test_class_name, test_name)
@staticmethod
def bare_event(event_type):
"""Creates an event with default additions, event type and timestamp.
@param event_type the value set for the "event" key, used
to distinguish events.
@returns an event dictionary with all default additions, the "event"
key set to the passed in event_type, and the event_time value set to
time.time().
"""
if EventBuilder.BASE_DICTIONARY is not None:
# Start with a copy of the "always include" entries.
event = dict(EventBuilder.BASE_DICTIONARY)
else:
event = {}
event.update({
"event": event_type,
"event_time": time.time()
})
return event
@staticmethod
def _event_dictionary_common(test, event_type):
"""Returns an event dictionary setup with values for the given event type.
@ -51,18 +74,12 @@ class EventBuilder(object):
"""
test_class_name, test_name = EventBuilder._get_test_name_info(test)
if EventBuilder.BASE_DICTIONARY is not None:
# Start with a copy of the "always include" entries.
result = dict(EventBuilder.BASE_DICTIONARY)
else:
result = {}
result.update({
"event": event_type,
event = EventBuilder.bare_event(event_type)
event.update({
"test_class": test_class_name,
"test_name": test_name,
"event_time": time.time()
})
return result
return event
@staticmethod
def _error_tuple_class(error_tuple):
@ -278,14 +295,6 @@ class EventBuilder(object):
"""
EventBuilder.BASE_DICTIONARY = dict(entries_dict)
@staticmethod
def base_event():
"""@return the base event dictionary that all events should contain."""
if EventBuilder.BASE_DICTIONARY is not None:
return dict(EventBuilder.BASE_DICTIONARY)
else:
return None
class ResultsFormatter(object):
@ -312,6 +321,8 @@ class ResultsFormatter(object):
# Single call to session start, before parsing any events.
formatter.begin_session()
formatter.handle_event({"event":"initialize",...})
# Zero or more calls specified for events recorded during the test session.
# The parallel test runner manages getting results from all the inferior
# dotest processes, so from a new format perspective, don't worry about
@ -319,12 +330,12 @@ class ResultsFormatter(object):
# sandwiched between a single begin_session()/end_session() pair in the
# parallel test runner process/thread.
for event in zero_or_more_test_events():
formatter.process_event(event)
formatter.handle_event(event)
# Single call to session end. Formatters that need all the data before
# they can print a correct result (e.g. xUnit/JUnit), this is where
# the final report can be generated.
formatter.end_session()
# Single call to terminate/wrap-up. Formatters that need all the
# data before they can print a correct result (e.g. xUnit/JUnit),
# this is where the final report can be generated.
formatter.handle_event({"event":"terminate",...})
It is not the formatter's responsibility to close the file_like_object.
(i.e. do not close it).
@ -380,32 +391,8 @@ class ResultsFormatter(object):
# entirely consistent from the outside.
self.lock = threading.Lock()
def begin_session(self):
"""Begins a test session.
All process_event() calls must be sandwiched between
begin_session() and end_session() calls.
Derived classes may override this but should call this first.
"""
pass
def end_session(self):
"""Ends a test session.
All process_event() calls must be sandwiched between
begin_session() and end_session() calls.
All results formatting should be sent to the output
file object by the end of this call.
Derived classes may override this but should call this after
the dervied class's behavior is complete.
"""
pass
def process_event(self, test_event):
"""Processes the test event for collection into the formatter output.
def handle_event(self, test_event):
"""Handles the test event for collection into the formatter output.
Derived classes may override this but should call down to this
implementation first.
@ -573,17 +560,16 @@ class XunitFormatter(ResultsFormatter):
"unexpected_success": self._handle_unexpected_success
}
def begin_session(self):
super(XunitFormatter, self).begin_session()
def process_event(self, test_event):
super(XunitFormatter, self).process_event(test_event)
def handle_event(self, test_event):
super(XunitFormatter, self).handle_event(test_event)
event_type = test_event["event"]
if event_type is None:
return
if event_type == "test_start":
if event_type == "terminate":
self._finish_output()
elif event_type == "test_start":
self.track_start_time(
test_event["test_class"],
test_event["test_name"],
@ -805,7 +791,7 @@ class XunitFormatter(ResultsFormatter):
return result
def _end_session_internal(self):
def _finish_output_no_lock(self):
"""Flushes out the report of test executions to form valid xml output.
xUnit output is in XML. The reporting system cannot complete the
@ -850,11 +836,9 @@ class XunitFormatter(ResultsFormatter):
# Close off the test suite.
self.out_file.write('</testsuite></testsuites>\n')
super(XunitFormatter, self).end_session()
def end_session(self):
def _finish_output(self):
with self.lock:
self._end_session_internal()
self._finish_output_no_lock()
class RawPickledFormatter(ResultsFormatter):
@ -875,42 +859,31 @@ class RawPickledFormatter(ResultsFormatter):
super(RawPickledFormatter, self).__init__(out_file, options)
self.pid = os.getpid()
def begin_session(self):
super(RawPickledFormatter, self).begin_session()
event = EventBuilder.base_event()
if event is None:
event = {}
event.update({
"event": "session_begin",
"event_time": time.time(),
"pid": self.pid
})
self.process_event(event)
def handle_event(self, test_event):
super(RawPickledFormatter, self).handle_event(test_event)
def process_event(self, test_event):
super(RawPickledFormatter, self).process_event(test_event)
# Convert initialize/terminate events into job_begin/job_end events.
event_type = test_event["event"]
if event_type is None:
return
if event_type == "initialize":
test_event["event"] = "job_begin"
elif event_type == "terminate":
test_event["event"] = "job_end"
# Tack on the pid.
test_event["pid"] = self.pid
# Send it as {serialized_length_of_serialized_bytes}#{serialized_bytes}
pickled_message = cPickle.dumps(test_event)
self.out_file.send(
"{}#{}".format(len(pickled_message), pickled_message))
def end_session(self):
event = EventBuilder.base_event()
if event is None:
event = {}
event.update({
"event": "session_end",
"event_time": time.time(),
"pid": self.pid
})
self.process_event(event)
super(RawPickledFormatter, self).end_session()
class DumpFormatter(ResultsFormatter):
"""Formats events to the file as their raw python dictionary format."""
def process_event(self, test_event):
super(DumpFormatter, self).process_event(test_event)
def handle_event(self, test_event):
super(DumpFormatter, self).handle_event(test_event)
self.out_file.write("\n" + pprint.pformat(test_event) + "\n")