[lit][NFC] Cleanup lit worker process handling

Move code that is executed on worker process to separate file. This
makes the use of the pickled arguments stored in global variables in the
worker a bit clearer. (Still not pretty though.)

Extract handling of parallelism groups to it's own function.

Use BoundedSemaphore instead of Semaphore. BoundedSemaphore raises for
unmatched release() calls.

Cleanup imports.

Differential Revision: https://reviews.llvm.org/D58196

llvm-svn: 354187
This commit is contained in:
Julian Lettner 2019-02-16 00:40:40 +00:00
parent 312af158b0
commit 0d15bb5d33
3 changed files with 110 additions and 107 deletions

View File

@ -1,28 +1,9 @@
import os
import sys
import threading
import time
import traceback
try:
import Queue as queue
except ImportError:
import queue
try:
import win32api
except ImportError:
win32api = None
import multiprocessing import multiprocessing
import lit.Test import time
def abort_now(): import lit.Test
"""Abort the current process without doing any exception teardown""" import lit.util
sys.stdout.flush() import lit.worker
if win32api:
win32api.TerminateProcess(win32api.GetCurrentProcess(), 3)
else:
os.kill(0, 9)
class _Display(object): class _Display(object):
def __init__(self, display, provider, maxFailures): def __init__(self, display, provider, maxFailures):
@ -48,12 +29,11 @@ class Run(object):
# For example, some ASan tests require lots of virtual memory and run # For example, some ASan tests require lots of virtual memory and run
# faster with less parallelism on OS X. # faster with less parallelism on OS X.
self.parallelism_semaphores = \ self.parallelism_semaphores = \
{k: multiprocessing.Semaphore(v) for k, v in {k: multiprocessing.BoundedSemaphore(v) for k, v in
self.lit_config.parallelism_groups.items()} self.lit_config.parallelism_groups.items()}
def execute_test(self, test): def execute_test(self, test):
return _execute_test_impl(test, self.lit_config, return lit.worker._execute_test(test, self.lit_config)
self.parallelism_semaphores)
def execute_tests_in_pool(self, jobs, max_time): def execute_tests_in_pool(self, jobs, max_time):
# We need to issue many wait calls, so compute the final deadline and # We need to issue many wait calls, so compute the final deadline and
@ -67,22 +47,22 @@ class Run(object):
# interrupts the workers before we make it into our task callback, they # interrupts the workers before we make it into our task callback, they
# will each raise a KeyboardInterrupt exception and print to stderr at # will each raise a KeyboardInterrupt exception and print to stderr at
# the same time. # the same time.
pool = multiprocessing.Pool(jobs, worker_initializer, pool = multiprocessing.Pool(jobs, lit.worker.initializer,
(self.lit_config, (self.lit_config,
self.parallelism_semaphores)) self.parallelism_semaphores))
# Install a console-control signal handler on Windows. # Install a console-control signal handler on Windows.
if win32api is not None: if lit.util.win32api is not None:
def console_ctrl_handler(type): def console_ctrl_handler(type):
print('\nCtrl-C detected, terminating.') print('\nCtrl-C detected, terminating.')
pool.terminate() pool.terminate()
pool.join() pool.join()
abort_now() lit.util.abort_now()
return True return True
win32api.SetConsoleCtrlHandler(console_ctrl_handler, True) lit.util.win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
try: try:
async_results = [pool.apply_async(worker_run_one_test, async_results = [pool.apply_async(lit.worker.run_one_test,
args=(test_index, test), args=(test_index, test),
callback=self.consume_test_result) callback=self.consume_test_result)
for test_index, test in enumerate(self.tests)] for test_index, test in enumerate(self.tests)]
@ -143,11 +123,9 @@ class Run(object):
self.failure_count = 0 self.failure_count = 0
self.hit_max_failures = False self.hit_max_failures = False
if jobs == 1: if jobs == 1:
global child_lit_config
child_lit_config = self.lit_config
for test_index, test in enumerate(self.tests): for test_index, test in enumerate(self.tests):
result = worker_run_one_test(test_index, test) lit.worker._execute_test(test, self.lit_config)
self.consume_test_result(result) self.consume_test_result((test_index, test))
if self.hit_max_failures: if self.hit_max_failures:
break break
else: else:
@ -159,7 +137,7 @@ class Run(object):
test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0)) test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
def consume_test_result(self, pool_result): def consume_test_result(self, pool_result):
"""Test completion callback for worker_run_one_test """Test completion callback for lit.worker.run_one_test
Updates the test result status in the parent process. Each task in the Updates the test result status in the parent process. Each task in the
pool returns the test index and the result, and we use the index to look pool returns the test index and the result, and we use the index to look
@ -186,74 +164,3 @@ class Run(object):
if self.lit_config.maxFailures and \ if self.lit_config.maxFailures and \
self.failure_count == self.lit_config.maxFailures: self.failure_count == self.lit_config.maxFailures:
self.hit_max_failures = True self.hit_max_failures = True
def _execute_test_impl(test, lit_config, parallelism_semaphores):
"""Execute one test"""
pg = test.config.parallelism_group
if callable(pg):
pg = pg(test)
result = None
semaphore = None
try:
if pg:
semaphore = parallelism_semaphores[pg]
if semaphore:
semaphore.acquire()
start_time = time.time()
result = test.config.test_format.execute(test, lit_config)
# Support deprecated result from execute() which returned the result
# code and additional output as a tuple.
if isinstance(result, tuple):
code, output = result
result = lit.Test.Result(code, output)
elif not isinstance(result, lit.Test.Result):
raise ValueError("unexpected result from test execution")
result.elapsed = time.time() - start_time
except KeyboardInterrupt:
raise
except:
if lit_config.debug:
raise
output = 'Exception during script execution:\n'
output += traceback.format_exc()
output += '\n'
result = lit.Test.Result(lit.Test.UNRESOLVED, output)
finally:
if semaphore:
semaphore.release()
test.setResult(result)
child_lit_config = None
child_parallelism_semaphores = None
def worker_initializer(lit_config, parallelism_semaphores):
"""Copy expensive repeated data into worker processes"""
global child_lit_config
child_lit_config = lit_config
global child_parallelism_semaphores
child_parallelism_semaphores = parallelism_semaphores
def worker_run_one_test(test_index, test):
"""Run one test in a multiprocessing.Pool
Side effects in this function and functions it calls are not visible in the
main lit process.
Arguments and results of this function are pickled, so they should be cheap
to copy. For efficiency, we copy all data needed to execute all tests into
each worker and store it in the child_* global variables. This reduces the
cost of each task.
Returns an index and a Result, which the parent process uses to update
the display.
"""
try:
_execute_test_impl(test, child_lit_config, child_parallelism_semaphores)
return (test_index, test)
except KeyboardInterrupt as e:
# If a worker process gets an interrupt, abort it immediately.
abort_now()
except:
traceback.print_exc()

View File

@ -424,3 +424,17 @@ def killProcessAndChildren(pid):
psutilProc.kill() psutilProc.kill()
except psutil.NoSuchProcess: except psutil.NoSuchProcess:
pass pass
try:
import win32api
except ImportError:
win32api = None
def abort_now():
"""Abort the current process without doing any exception teardown"""
sys.stdout.flush()
if win32api:
win32api.TerminateProcess(win32api.GetCurrentProcess(), 3)
else:
os.kill(0, 9)

View File

@ -0,0 +1,82 @@
# The functions in this module are meant to run on a separate worker process.
# Exception: in single process mode _execute_test is called directly.
import time
import traceback
import lit.Test
import lit.util
_lit_config = None
_parallelism_semaphores = None
def initializer(lit_config, parallelism_semaphores):
"""Copy expensive repeated data into worker processes"""
global _lit_config
global _parallelism_semaphores
_lit_config = lit_config
_parallelism_semaphores = parallelism_semaphores
def run_one_test(test_index, test):
"""Run one test in a multiprocessing.Pool
Side effects in this function and functions it calls are not visible in the
main lit process.
Arguments and results of this function are pickled, so they should be cheap
to copy. For efficiency, we copy all data needed to execute all tests into
each worker and store it in the worker_* global variables. This reduces the
cost of each task.
Returns an index and a Result, which the parent process uses to update
the display.
"""
try:
_execute_test_in_parallelism_group(test, _lit_config,
_parallelism_semaphores)
return (test_index, test)
except KeyboardInterrupt:
# If a worker process gets an interrupt, abort it immediately.
lit.util.abort_now()
except:
traceback.print_exc()
def _execute_test_in_parallelism_group(test, lit_config, parallelism_semaphores):
"""Execute one test inside the appropriate parallelism group"""
pg = test.config.parallelism_group
if callable(pg):
pg = pg(test)
if pg:
semaphore = parallelism_semaphores[pg]
try:
semaphore.acquire()
_execute_test(test, lit_config)
finally:
semaphore.release()
else:
_execute_test(test, lit_config)
def _execute_test(test, lit_config):
"""Execute one test"""
try:
start_time = time.time()
result = test.config.test_format.execute(test, lit_config)
# Support deprecated result from execute() which returned the result
# code and additional output as a tuple.
if isinstance(result, tuple):
code, output = result
result = lit.Test.Result(code, output)
elif not isinstance(result, lit.Test.Result):
raise ValueError("unexpected result from test execution")
result.elapsed = time.time() - start_time
except KeyboardInterrupt:
raise
except:
if lit_config.debug:
raise
output = 'Exception during script execution:\n'
output += traceback.format_exc()
output += '\n'
result = lit.Test.Result(lit.Test.UNRESOLVED, output)
test.setResult(result)