diff options
-rw-r--r-- | buildbot/builderstage.py | 3 | ||||
-rwxr-xr-x | buildbot/cbuildbot_config.py | 7 | ||||
-rwxr-xr-x | buildbot/cbuildbot_config_unittest.py | 10 | ||||
-rw-r--r-- | lib/parallel.py | 75 | ||||
-rwxr-xr-x | lib/parallel_unittest.py | 24 |
5 files changed, 98 insertions, 21 deletions
diff --git a/buildbot/builderstage.py b/buildbot/builderstage.py index 11811a6b8..f6eae9c96 100644 --- a/buildbot/builderstage.py +++ b/buildbot/builderstage.py @@ -282,6 +282,9 @@ class BuilderStage(object): if result not in (results_lib.Results.FORGIVEN, results_lib.Results.SUCCESS): raise results_lib.StepFailure() + except BaseException as e: + result, description = self._HandleStageException(e) + raise finally: elapsed_time = time.time() - start_time results_lib.Results.Record(self.name, result, description, diff --git a/buildbot/cbuildbot_config.py b/buildbot/cbuildbot_config.py index f4982460f..2b81b152d 100755 --- a/buildbot/cbuildbot_config.py +++ b/buildbot/cbuildbot_config.py @@ -43,8 +43,10 @@ CONFIG_TYPE_DUMP_ORDER = ( 'refresh-packages', ) +# hw_tests_timeout -- Usually, 2 hours and ten minutes. This must be less than +# lib.parallel._BackgroundTask.MINIMUM_SILENT_TIMEOUT. DEFAULT_HW_TESTS = ['bvt'] -DEFAULT_HW_TEST_TIMEOUT = 8400 +DEFAULT_HW_TEST_TIMEOUT = 60 * 130 def OverrideConfigForTrybot(build_config, options): """Apply trybot-specific configuration settings. @@ -318,7 +320,8 @@ _settings = dict( # them to the perf dashboard. hw_copy_perf_results=False, -# hw_tests_timeout -- Usually, 2 hours and twenty minutes. +# hw_tests_timeout -- Usually, 2 hours and ten minutes. This must be less than +# lib.parallel._BackgroundTask.MINIMUM_SILENT_TIMEOUT. hw_tests_timeout=DEFAULT_HW_TEST_TIMEOUT, # hw_tests_pool -- Pool to use for hw testing. diff --git a/buildbot/cbuildbot_config_unittest.py b/buildbot/cbuildbot_config_unittest.py index 78df5dded..3b4333af8 100755 --- a/buildbot/cbuildbot_config_unittest.py +++ b/buildbot/cbuildbot_config_unittest.py @@ -18,6 +18,7 @@ sys.path.insert(0, constants.SOURCE_ROOT) from chromite.buildbot import cbuildbot_config from chromite.lib import cros_test_lib from chromite.lib import git +from chromite.lib import parallel CHROMIUM_WATCHING_URL = ("http://src.chromium.org/viewvc/" + "chrome/trunk/tools/build/masters/" + @@ -228,6 +229,15 @@ class CBuildBotTest(cros_test_lib.MoxTestCase): "%s is trying to run hw tests without uploading payloads." % build_name) + def testHWTestTimeout(self): + """Verify that hw test timeout is in a reasonable range.""" + # The parallel library will kill the process if it's silent for longer + # than the silent timeout. + max_timeout = parallel._BackgroundTask.MINIMUM_SILENT_TIMEOUT + for build_name, config in cbuildbot_config.config.iteritems(): + self.assertTrue(config['hw_tests_timeout'] < max_timeout, + '%s has a hw_tests_timeout that is too large.' % build_name) + def testValidUnifiedMasterConfig(self): """Make sure any unified master configurations are valid.""" for build_name, config in cbuildbot_config.config.iteritems(): diff --git a/lib/parallel.py b/lib/parallel.py index ae4cda60a..9cc86a8fd 100644 --- a/lib/parallel.py +++ b/lib/parallel.py @@ -43,6 +43,17 @@ class _BackgroundTask(multiprocessing.Process): STARTUP_TIMEOUT = 60 * 5 EXIT_TIMEOUT = 60 * 10 + # The time we allow processes to be silent. This must be greater than the + # hw_test_timeout set in cbuildbot_config.py, and less than the timeout set + # by buildbot itself (typically, 150 minutes.) + SILENT_TIMEOUT = 60 * 145 + + # The amount by which we reduce the SILENT_TIMEOUT every time we launch + # a subprocess. This helps ensure that children get a chance to enforce the + # SILENT_TIMEOUT prior to the parents enforcing it. + SILENT_TIMEOUT_STEP = 30 + MINIMUM_SILENT_TIMEOUT = 60 * 135 + # The time before terminating or killing a task. SIGTERM_TIMEOUT = 30 SIGKILL_TIMEOUT = 60 @@ -79,6 +90,7 @@ class _BackgroundTask(multiprocessing.Process): sig: Signal to send. log_level: The log level of log messages. """ + self._killing.set() self._WaitForStartup() if logger.isEnabledFor(log_level): # Print a pstree of the hanging process. @@ -87,7 +99,6 @@ class _BackgroundTask(multiprocessing.Process): debug_level=log_level, error_code_ok=True, log_output=True) - self._killing.set() try: os.kill(self.pid, sig) except OSError as ex: @@ -125,39 +136,48 @@ class _BackgroundTask(multiprocessing.Process): # File position pointers are shared across processes, so we must open # our own file descriptor to ensure output is not lost. self._WaitForStartup() + silent_death_time = time.time() + self.SILENT_TIMEOUT results = [] with open(self._output.name, 'r') as output: pos = 0 - more_output = True - exited_cleanly = False - while more_output: + running, exited_cleanly, msg, error = (True, False, None, None) + while running: # Check whether the process is still alive. - more_output = self.is_alive() + running = self.is_alive() try: error, results = self._queue.get(True, self.PRINT_INTERVAL) - more_output = False + running = False exited_cleanly = True except Queue.Empty: pass - if not more_output: + if not running: # Wait for the process to actually exit. If the child doesn't exit # in a timely fashion, kill it. self.join(self.EXIT_TIMEOUT) if self.exitcode is None: msg = '%r hung for %r seconds' % (self, self.EXIT_TIMEOUT) - logger.warning(msg) self._KillChildren([self]) - error = (error + '\n%s' % msg) if error else msg elif not exited_cleanly: - error = ('%r exited unexpectedly with code %s' % - (self, self.exitcode)) - + msg = ('%r exited unexpectedly with code %s' % + (self, self.EXIT_TIMEOUT)) # Read output from process. output.seek(pos) buf = output.read(_BUFSIZE) + if len(buf) > 0: + silent_death_time = time.time() + self.SILENT_TIMEOUT + elif running and time.time() > silent_death_time: + msg = ('No output from %r for %r seconds' % + (self, self.SILENT_TIMEOUT)) + self._KillChildren([self]) + + # Read remaining output from the process. + output.seek(pos) + buf = output.read(_BUFSIZE) + running = False + # Print output so far. while len(buf) > 0: sys.stdout.write(buf) @@ -165,7 +185,16 @@ class _BackgroundTask(multiprocessing.Process): if len(buf) < _BUFSIZE: break buf = output.read(_BUFSIZE) + + # Print error messages if anything exceptional occurred. + if msg: + cros_build_lib.PrintBuildbotStepFailure() + error = '\n'.join(x for x in (error, msg) if x) + logger.warning(error) + traceback.print_stack() + sys.stdout.flush() + sys.stderr.flush() # Propagate any results. for result in results: @@ -179,6 +208,9 @@ class _BackgroundTask(multiprocessing.Process): def start(self): """Invoke multiprocessing.Process.start after flushing output/err.""" + if self.SILENT_TIMEOUT < self.MINIMUM_SILENT_TIMEOUT: + raise AssertionError('Maximum recursion depth exceeded in %r' % self) + sys.stdout.flush() sys.stderr.flush() self._output = tempfile.NamedTemporaryFile(delete=False, bufsize=0, @@ -196,7 +228,7 @@ class _BackgroundTask(multiprocessing.Process): try: error = self._Run() finally: - if os.getpid() == pid: + if not self._killing.is_set() and os.getpid() == pid: results = results_lib.Results.Get() self._queue.put((error, results)) if self._semaphore is not None: @@ -232,17 +264,23 @@ class _BackgroundTask(multiprocessing.Process): try: self._started.set() results_lib.Results.Clear() + + # Reduce the silent timeout by the prescribed amount. + cls = self.__class__ + cls.SILENT_TIMEOUT -= cls.SILENT_TIMEOUT_STEP + + # Actually launch the task. self._task() except results_lib.StepFailure as ex: error = str(ex) except BaseException as ex: error = traceback.format_exc() + if self._killing.is_set(): + traceback.print_exc() + finally: + sys.stdout.flush() + sys.stderr.flush() - if self._killing.is_set(): - output.write(error) - - sys.stdout.flush() - sys.stderr.flush() return error @classmethod @@ -256,7 +294,6 @@ class _BackgroundTask(multiprocessing.Process): bg_tasks: A list filled with _BackgroundTask objects. log_level: The log level of log messages. """ - logger.log(log_level, 'Killing tasks: %r', bg_tasks) signals = ((signal.SIGINT, cls.SIGTERM_TIMEOUT), (signal.SIGTERM, cls.SIGKILL_TIMEOUT), diff --git a/lib/parallel_unittest.py b/lib/parallel_unittest.py index 528befa61..9d6b8328b 100755 --- a/lib/parallel_unittest.py +++ b/lib/parallel_unittest.py @@ -14,6 +14,7 @@ import time import Queue sys.path.insert(0, os.path.abspath('%s/../../..' % __file__)) +from chromite.lib import cros_build_lib from chromite.lib import cros_test_lib from chromite.lib import osutils from chromite.lib import parallel @@ -286,6 +287,29 @@ class TestHalting(cros_test_lib.MockOutputTestCase, TestBackgroundWrapper): self.testExceptionRaising() self.assertEqual(os.listdir(tempdir), []) + def testKillQuiet(self, steps=None, **kwds): + """Test that processes do get killed if they're silent for too long.""" + if steps is None: + steps = [self._Fail] * 10 + kwds.setdefault('SILENT_TIMEOUT', 0.1) + kwds.setdefault('MINIMUM_SILENT_TIMEOUT', 0.01) + kwds.setdefault('SILENT_TIMEOUT_STEP', 0) + kwds.setdefault('SIGTERM_TIMEOUT', 0.1) + kwds.setdefault('PRINT_INTERVAL', 0.01) + + ex_str = None + with mock.patch.multiple(parallel._BackgroundTask, **kwds): + with self.OutputCapturer() as capture: + try: + with cros_test_lib.DisableLogger(parallel.logger): + with cros_test_lib.DisableLogger(cros_build_lib.logger): + parallel.RunParallelSteps(steps) + except parallel.BackgroundFailure as ex: + ex_str = str(ex) + error_str = capture.GetStderr() + self.assertTrue('parallel_unittest.py' in error_str) + self.assertTrue(ex_str) + if __name__ == '__main__': # Set timeouts small so that if the unit test hangs, it won't hang for long. |