diff options
Diffstat (limited to 'python/helpers/pycharm')
-rw-r--r-- | python/helpers/pycharm/_bdd_utils.py | 201 | ||||
-rw-r--r-- | python/helpers/pycharm/behave_runner.py | 242 | ||||
-rw-r--r-- | python/helpers/pycharm/lettuce_runner.py | 158 | ||||
-rw-r--r-- | python/helpers/pycharm/tcunittest.py | 87 |
4 files changed, 580 insertions, 108 deletions
diff --git a/python/helpers/pycharm/_bdd_utils.py b/python/helpers/pycharm/_bdd_utils.py new file mode 100644 index 000000000000..0c92532b516c --- /dev/null +++ b/python/helpers/pycharm/_bdd_utils.py @@ -0,0 +1,201 @@ +# coding=utf-8 +""" +Tools for running BDD frameworks in python. +You probably need to extend BddRunner (see its doc). + +You may also need "get_path_by_args" that gets folder (current or passed as first argument) +""" +import os +import time +import abc + +import tcmessages + + +__author__ = 'Ilya.Kazakevich' + + +def get_path_by_args(arguments): + """ + :type arguments list + :param arguments: arguments (sys.argv) + :return: tuple (base_dir, what_to_run) where dir is current or first argument from argv, checking it exists + :rtype tuple of str + """ + what_to_run = arguments[1] if len(arguments) > 1 else "." + base_dir = what_to_run + assert os.path.exists(what_to_run), "{} does not exist".format(what_to_run) + + if os.path.isfile(what_to_run): + base_dir = os.path.dirname(what_to_run) # User may point to the file directly + return base_dir, what_to_run + + +class BddRunner(object): + """ + Extends this class, implement abstract methods and use its API to implement new BDD frameworks. + Call "run()" to launch it. + This class does the following: + * Gets features to run (using "_get_features_to_run()") and calculates steps in it + * Reports steps to Intellij or TC + * Calls "_run_tests()" where *you* should install all hooks you need into your BDD and use "self._" functions + to report tests and features. It actually wraps tcmessages but adds some stuff like duration count etc + :param base_dir: + """ + __metaclass__ = abc.ABCMeta + + def __init__(self, base_dir): + """ + :type base_dir str + :param base_dir base directory of your project + """ + super(BddRunner, self).__init__() + self.tc_messages = tcmessages.TeamcityServiceMessages() + """ + tcmessages TeamCity/Intellij test API. See TeamcityServiceMessages + """ + self.__base_dir = base_dir + self.__last_test_start_time = None # TODO: Doc when use + self.__last_test_name = None + + def run(self): + """" + Runs runner. To be called right after constructor. + """ + self.tc_messages.testCount(self._get_number_of_tests()) + self.tc_messages.testMatrixEntered() + self._run_tests() + + def __gen_location(self, location): + """ + Generates location in format, supported by tcmessages + :param location object with "file" (relative to base_dir) and "line" fields. + :return: location in format file:line (as supported in tcmessages) + """ + my_file = str(location.file).lstrip("/\\") + return "file:///{}:{}".format(os.path.normpath(os.path.join(self.__base_dir, my_file)), location.line) + + def _test_undefined(self, test_name, location): + """ + Mark test as undefined + :param test_name: name of test + :type test_name str + :param location its location + + """ + if test_name != self.__last_test_name: + self._test_started(test_name, location) + self._test_failed(test_name, message="Test undefined", details="Please define test") + + def _test_skipped(self, test_name, reason, location): + """ + Mark test as skipped + :param test_name: name of test + :param reason: why test was skipped + :type reason str + :type test_name str + :param location its location + + """ + if test_name != self.__last_test_name: + self._test_started(test_name, location) + self.tc_messages.testIgnored(test_name, "Skipped: {}".format(reason)) + self.__last_test_name = None + pass + + def _test_failed(self, name, message, details): + """ + Report test failure + :param name: test name + :type name str + :param message: failure message + :type message str + :param details: failure details (probably stacktrace) + :type details str + """ + self.tc_messages.testFailed(name, message=message, details=details) + self.__last_test_name = None + + def _test_passed(self, name, duration=None): + """ + Reports test passed + :param name: test name + :type name str + :param duration: time (in seconds) test took. Pass None if you do not know (we'll try to calculate it) + :type duration int + :return: + """ + duration_to_report = duration + if self.__last_test_start_time and not duration: # And not provided + duration_to_report = int(time.time() - self.__last_test_start_time) + self.tc_messages.testFinished(name, duration=int(duration_to_report)) + self.__last_test_start_time = None + self.__last_test_name = None + + def _test_started(self, name, location): + """ + Reports test launched + :param name: test name + :param location object with "file" (relative to base_dir) and "line" fields. + :type name str + """ + self.__last_test_start_time = time.time() + self.__last_test_name = name + self.tc_messages.testStarted(name, self.__gen_location(location)) + + def _feature_or_scenario(self, is_started, name, location): + """ + Reports feature or scenario launched or stopped + :param is_started: started or finished? + :type is_started bool + :param name: scenario or feature name + :param location object with "file" (relative to base_dir) and "line" fields. + """ + if is_started: + self.tc_messages.testSuiteStarted(name, self.__gen_location(location)) + else: + self.tc_messages.testSuiteFinished(name) + + def _background(self, is_started, location): + """ + Reports background or stopped + :param is_started: started or finished? + :type is_started bool + :param location object with "file" (relative to base_dir) and "line" fields. + """ + self._feature_or_scenario(is_started, "Background", location) + + def _get_number_of_tests(self): + """" + Gets number of tests using "_get_features_to_run()" to obtain number of features to calculate. + Supports backgrounds as well. + :return number of steps + :rtype int + """ + num_of_steps = 0 + for feature in self._get_features_to_run(): + if feature.background: + num_of_steps += len(feature.background.steps) * len(feature.scenarios) + for scenario in feature.scenarios: + num_of_steps += len(scenario.steps) + return num_of_steps + + @abc.abstractmethod + def _get_features_to_run(self): + """ + Implement it! Return list of features to run. Each "feature" should have "scenarios". + Each "scenario" should have "steps". Each "feature" may have "background" and each "background" should have + "steps". Duck typing. + :rtype list + :returns list of features + """ + return [] + + @abc.abstractmethod + def _run_tests(self): + """ + Implement it! It should launch tests using your BDD. Use "self._" functions to report results. + """ + pass + + diff --git a/python/helpers/pycharm/behave_runner.py b/python/helpers/pycharm/behave_runner.py new file mode 100644 index 000000000000..4a1b2f6557c5 --- /dev/null +++ b/python/helpers/pycharm/behave_runner.py @@ -0,0 +1,242 @@ +# coding=utf-8 +""" +Behave BDD runner. +*FIRST* param now: folder to search "features" for. +Each "features" folder should have features and "steps" subdir. + +Other args are tag expressionsin format (--tags=.. --tags=..). +See https://pythonhosted.org/behave/behave.html#tag-expression +""" +import functools +import sys +import os +import traceback + +from behave.formatter.base import Formatter +from behave.model import Step, ScenarioOutline, Feature, Scenario +from behave.tag_expression import TagExpression + +import _bdd_utils + + +_MAX_STEPS_SEARCH_FEATURES = 5000 # Do not look for features in folder that has more that this number of children +_FEATURES_FOLDER = 'features' # "features" folder name. + +__author__ = 'Ilya.Kazakevich' + +from behave import configuration, runner +from behave.formatter import formatters + + +def _get_dirs_to_run(base_dir_to_search): + """ + Searches for "features" dirs in some base_dir + :return: list of feature dirs to run + :rtype: list + :param base_dir_to_search root directory to search (should not have too many children!) + :type base_dir_to_search str + + """ + result = set() + for (step, (folder, sub_folders, files)) in enumerate(os.walk(base_dir_to_search)): + if os.path.basename(folder) == _FEATURES_FOLDER and os.path.isdir(folder): + result.add(os.path.abspath(folder)) + if step == _MAX_STEPS_SEARCH_FEATURES: # Guard + err = "Folder {} is too deep to find any features folder. Please provider concrete folder".format( + base_dir_to_search) + raise Exception(err) + return list(result) + + +def _merge_hooks_wrapper(*hooks): + """ + Creates wrapper that runs provided behave hooks sequentally + :param hooks: hooks to run + :return: wrapper + """ + # TODO: Wheel reinvented!!!! + def wrapper(*args, **kwargs): + for hook in hooks: + hook(*args, **kwargs) + + return wrapper + + +class _RunnerWrapper(runner.Runner): + """ + Wrapper around behave native wrapper. Has nothing todo with BddRunner! + We need it to support dry runs (to fetch data from scenarios) and hooks api + """ + + def __init__(self, config, hooks): + """ + :type config configuration.Configuration + :param config behave configuration + :type hooks dict + :param hooks hooks in format "before_scenario" => f(context, scenario) to load after/before hooks, provided by user + """ + super(_RunnerWrapper, self).__init__(config) + self.dry_run = False + """ + Does not run tests (only fetches "self.features") if true. Runs tests otherwise. + """ + self.__hooks = hooks + + def load_hooks(self, filename='environment.py'): + """ + Overrides parent "load_hooks" to add "self.__hooks" + :param filename: env. file name + """ + super(_RunnerWrapper, self).load_hooks(filename) + for (hook_name, hook) in self.__hooks.items(): + hook_to_add = hook + if hook_name in self.hooks: + user_hook = self.hooks[hook_name] + if hook_name.startswith("before"): + user_and_custom_hook = [user_hook, hook] + else: + user_and_custom_hook = [hook, user_hook] + hook_to_add = _merge_hooks_wrapper(*user_and_custom_hook) + self.hooks[hook_name] = hook_to_add + + def run_model(self, features=None): + """ + Overrides parent method to stop (do nothing) in case of "dry_run" + :param features: features to run + :return: + """ + if self.dry_run: # To stop further execution + return + return super(_RunnerWrapper, self).run_model(features) + + def clean(self): + """ + Cleans runner after dry run (clears hooks, features etc). To be called before real run! + """ + self.dry_run = False + self.hooks.clear() + self.features = [] + + +class _BehaveRunner(_bdd_utils.BddRunner): + """ + BddRunner for behave + """ + + + def __process_hook(self, is_started, context, element): + """ + Hook to be installed. Reports steps, features etc. + :param is_started true if test/feature/scenario is started + :type is_started bool + :param context behave context + :type context behave.runner.Context + :param element feature/suite/step + """ + element.location.file = element.location.filename # To preserve _bdd_utils contract + if isinstance(element, Step): + # Process step + if is_started: + self._test_started(element.name, element.location) + elif element.status == 'passed': + self._test_passed(element.name, element.duration) + elif element.status == 'failed': + try: + trace = traceback.format_exc() + except Exception: + trace = "".join(traceback.format_tb(element.exc_traceback)) + self._test_failed(element.name, element.error_message, trace) + elif element.status == 'undefined': + self._test_undefined(element.name, element.location) + else: + self._test_skipped(element.name, element.status, element.location) + elif not is_started and isinstance(element, Scenario) and element.status == 'failed': + # To process scenarios with undefined/skipped tests + for step in element.steps: + assert isinstance(step, Step), step + if step.status not in ['passed', 'failed']: # Something strange, probably skipped or undefined + self.__process_hook(False, context, step) + self._feature_or_scenario(is_started, element.name, element.location) + elif isinstance(element, ScenarioOutline): + self._feature_or_scenario(is_started, str(element.examples), element.location) + else: + self._feature_or_scenario(is_started, element.name, element.location) + + def __init__(self, config, base_dir): + """ + :type config configuration.Configuration + """ + super(_BehaveRunner, self).__init__(base_dir) + self.__config = config + # Install hooks + self.__real_runner = _RunnerWrapper(config, { + "before_feature": functools.partial(self.__process_hook, True), + "after_feature": functools.partial(self.__process_hook, False), + "before_scenario": functools.partial(self.__process_hook, True), + "after_scenario": functools.partial(self.__process_hook, False), + "before_step": functools.partial(self.__process_hook, True), + "after_step": functools.partial(self.__process_hook, False) + }) + + def _run_tests(self): + self.__real_runner.run() + + + def __filter_scenarios_by_tag(self, scenario): + """ + Filters out scenarios that should be skipped by tags + :param scenario scenario to check + :return true if should pass + """ + assert isinstance(scenario, Scenario), scenario + expected_tags = self.__config.tags + if not expected_tags: + return True # No tags are required + return isinstance(expected_tags, TagExpression) and expected_tags.check(scenario.tags) + + + def _get_features_to_run(self): + self.__real_runner.dry_run = True + self.__real_runner.run() + features_to_run = self.__real_runner.features + self.__real_runner.clean() # To make sure nothing left after dry run + + # Change outline scenario skeletons with real scenarios + for feature in features_to_run: + assert isinstance(feature, Feature), feature + scenarios = [] + for scenario in feature.scenarios: + if isinstance(scenario, ScenarioOutline): + scenarios.extend(scenario.scenarios) + else: + scenarios.append(scenario) + feature.scenarios = filter(self.__filter_scenarios_by_tag, scenarios) + + return features_to_run + + +if __name__ == "__main__": + # TODO: support all other params instead + + class _Null(Formatter): + """ + Null formater to prevent stdout output + """ + pass + + command_args = list(filter(None, sys.argv[1:])) + my_config = configuration.Configuration(command_args=command_args) + formatters.register_as(_Null, "com.intellij.python.null") + my_config.format = ["com.intellij.python.null"] # To prevent output to stdout + my_config.reporters = [] # To prevent summary to stdout + my_config.stdout_capture = False # For test output + my_config.stderr_capture = False # For test output + (base_dir, what_to_run) = _bdd_utils.get_path_by_args(sys.argv) + if not my_config.paths: # No path provided, trying to load dit manually + if os.path.isfile(what_to_run): # File is provided, load it + my_config.paths = [what_to_run] + else: # Dir is provided, find subdirs ro run + my_config.paths = _get_dirs_to_run(base_dir) + _BehaveRunner(my_config, base_dir).run() + + diff --git a/python/helpers/pycharm/lettuce_runner.py b/python/helpers/pycharm/lettuce_runner.py index 6aaa566df719..3cd112540e5f 100644 --- a/python/helpers/pycharm/lettuce_runner.py +++ b/python/helpers/pycharm/lettuce_runner.py @@ -1,132 +1,112 @@ # coding=utf-8 """ BDD lettuce framework runner +TODO: Support other params (like tags) as well. +Supports only 1 param now: folder to search "features" for. """ +import _bdd_utils + __author__ = 'Ilya.Kazakevich' -import os from lettuce.exceptions import ReasonToFail -import time import sys -import tcmessages import lettuce from lettuce import core -# Error message about unsupported outlines -_NO_OUTLINE_ERROR = "Outline scenarios are not supported due to https://github.com/gabrielfalcao/lettuce/issues/451" - - -class LettuceRunner(object): +class _LettuceRunner(_bdd_utils.BddRunner): """ - TODO: Runs lettuce + Lettuce runner (BddRunner for lettuce) """ - def __init__(self, base_dir): + def __init__(self, base_dir, what_to_run): """ + :param base_dir base directory to run tests in :type base_dir: str - - """ - self.base_dir = base_dir - self.runner = lettuce.Runner(base_dir) - self.messages = tcmessages.TeamcityServiceMessages() - self.test_start_time = None - - def report_tests(self): - """ - :returns : number of tests - :rtype : int - """ - result = 0 - for feature_file in self.runner.loader.find_feature_files(): - feature = core.Feature.from_file(feature_file) - for scenario in feature.scenarios: - assert isinstance(scenario, core.Scenario), scenario - if not scenario.outlines: - result += len(scenario.steps) - self.messages.testCount(result) - - def report_scenario_started(self, scenario): + :param what_to_run folder or file to run + :type what_to_run str """ - Reports scenario launched - :type scenario core.Scenario - :param scenario: scenario - """ - if scenario.outlines: - self.messages.testIgnored(scenario.name, - _NO_OUTLINE_ERROR) - scenario.steps = [] # Clear to prevent running. TODO: Fix when this issue fixed - scenario.background = None # TODO: undocumented - return - self.report_suite(True, scenario.name, scenario.described_at) + super(_LettuceRunner, self).__init__(base_dir) + self.__runner = lettuce.Runner(what_to_run) - def report_suite(self, is_start, name, described_at): - """ - Reports some suite (scenario, feature, background etc) is started or stopped - :param is_start: started or not - :param name: suite name - :param described_at: where it is described (file, line) - :return: - """ - if is_start: - self.messages.testSuiteStarted(name, self._gen_location(described_at)) - else: - self.messages.testSuiteFinished(name) + def _get_features_to_run(self): + super(_LettuceRunner, self)._get_features_to_run() + if self.__runner.single_feature: # We need to run one and only one feature + return [core.Feature.from_file(self.__runner.single_feature)] - def report_step(self, is_start, step): + # Find all features in dir + features = [] + for feature_file in self.__runner.loader.find_feature_files(): + feature = core.Feature.from_file(feature_file) + assert isinstance(feature, core.Feature), feature + # TODO: cut out due to https://github.com/gabrielfalcao/lettuce/issues/451 Fix when this issue fixed + feature.scenarios = filter(lambda s: not s.outlines, feature.scenarios) + if feature.scenarios: + features.append(feature) + return features + + def _run_tests(self): + super(_LettuceRunner, self)._run_tests() + self.__install_hooks() + self.__runner.run() + + def __step(self, is_started, step): """ Reports step start / stop - :param is_start: true if step started :type step core.Step :param step: step """ test_name = step.sentence - if is_start: - self.test_start_time = time.time() - self.messages.testStarted(test_name, self._gen_location(step.described_at)) + if is_started: + self._test_started(test_name, step.described_at) elif step.passed: - duration = 0 - if self.test_start_time: - duration = long(time.time() - self.test_start_time) - self.messages.testFinished(test_name, duration=duration) - self.test_start_time = None + self._test_passed(test_name) elif step.failed: reason = step.why assert isinstance(reason, ReasonToFail), reason - self.messages.testFailed(test_name, message=reason.exception, details=reason.traceback) - - def _gen_location(self, description): - """ - :param description: "described_at" (file, line) - :return: location in format file:line by "described_at" - """ - return "file:///{}/{}:{}".format(self.base_dir, description.file, description.line) + self._test_failed(test_name, message=reason.exception, details=reason.traceback) + elif step.has_definition: + self._test_skipped(test_name, "In lettuce, we do know the reason", step.described_at) + else: + self._test_undefined(test_name, step.described_at) - def run(self): + def __install_hooks(self): """ - Launches runner + Installs required hooks """ - self.report_tests() - self.messages.testMatrixEntered() - lettuce.before.each_feature(lambda f: self.report_suite(True, f.name, f.described_at)) - lettuce.after.each_feature(lambda f: self.report_suite(False, f.name, f.described_at)) + # Install hooks + lettuce.before.each_feature( + lambda f: self._feature_or_scenario(True, f.name, f.described_at)) + lettuce.after.each_feature( + lambda f: self._feature_or_scenario(False, f.name, f.described_at)) - lettuce.before.each_scenario(lambda s: self.report_scenario_started(s)) - lettuce.after.each_scenario(lambda s: self.report_suite(False, s.name, s.described_at)) + lettuce.before.each_scenario( + lambda s: self.__scenario(True, s)) + lettuce.after.each_scenario( + lambda s: self.__scenario(False, s)) lettuce.before.each_background( - lambda b, *args: self.report_suite(True, "Scenario background", b.feature.described_at)) + lambda b, *args: self._background(True, b.feature.described_at)) lettuce.after.each_background( - lambda b, *args: self.report_suite(False, "Scenario background", b.feature.described_at)) + lambda b, *args: self._background(False, b.feature.described_at)) - lettuce.before.each_step(lambda s: self.report_step(True, s)) - lettuce.after.each_step(lambda s: self.report_step(False, s)) + lettuce.before.each_step(lambda s: self.__step(True, s)) + lettuce.after.each_step(lambda s: self.__step(False, s)) - self.runner.run() + def __scenario(self, is_started, scenario): + """ + Reports scenario launched + :type scenario core.Scenario + :param scenario: scenario + """ + if scenario.outlines: + scenario.steps = [] # Clear to prevent running. TODO: Fix when this issue fixed + scenario.background = None # TODO: undocumented + return + self._feature_or_scenario(is_started, scenario.name, scenario.described_at) if __name__ == "__main__": - path = sys.argv[1] if len(sys.argv) > 1 else "." - assert os.path.exists(path), "{} does not exist".format(path) - LettuceRunner(path).run()
\ No newline at end of file + (base_dir, what_to_run) = _bdd_utils.get_path_by_args(sys.argv) + _LettuceRunner(base_dir, what_to_run).run()
\ No newline at end of file diff --git a/python/helpers/pycharm/tcunittest.py b/python/helpers/pycharm/tcunittest.py index b6950c92a11f..99b30595a191 100644 --- a/python/helpers/pycharm/tcunittest.py +++ b/python/helpers/pycharm/tcunittest.py @@ -6,14 +6,16 @@ from tcmessages import TeamcityServiceMessages PYTHON_VERSION_MAJOR = sys.version_info[0] + def strclass(cls): if not cls.__name__: return cls.__module__ return "%s.%s" % (cls.__module__, cls.__name__) + def smart_str(s): - encoding='utf-8' - errors='strict' + encoding = 'utf-8' + errors = 'strict' if PYTHON_VERSION_MAJOR < 3: is_string = isinstance(s, basestring) else: @@ -33,6 +35,7 @@ def smart_str(s): else: return s + class TeamcityTestResult(TestResult): def __init__(self, stream=sys.stdout, *args, **kwargs): TestResult.__init__(self) @@ -41,42 +44,47 @@ class TeamcityTestResult(TestResult): self.output = stream self.messages = TeamcityServiceMessages(self.output, prepend_linebreak=True) self.messages.testMatrixEntered() + self.current_failed = False self.current_suite = None + self.subtest_suite = None def find_first(self, val): quot = val[0] count = 1 quote_ind = val[count:].find(quot) - while quote_ind != -1 and val[count+quote_ind-1] == "\\": + while quote_ind != -1 and val[count + quote_ind - 1] == "\\": count = count + quote_ind + 1 quote_ind = val[count:].find(quot) - return val[0:quote_ind+count+1] + return val[0:quote_ind + count + 1] def find_second(self, val): val_index = val.find("!=") if val_index != -1: count = 1 - val = val[val_index+2:].strip() + val = val[val_index + 2:].strip() quot = val[0] quote_ind = val[count:].find(quot) - while quote_ind != -1 and val[count+quote_ind-1] == "\\": + while quote_ind != -1 and val[count + quote_ind - 1] == "\\": count = count + quote_ind + 1 quote_ind = val[count:].find(quot) - return val[0:quote_ind+count+1] + return val[0:quote_ind + count + 1] else: quot = val[-1] - quote_ind = val[:len(val)-1].rfind(quot) - while quote_ind != -1 and val[quote_ind-1] == "\\": - quote_ind = val[:quote_ind-1].rfind(quot) + quote_ind = val[:len(val) - 1].rfind(quot) + while quote_ind != -1 and val[quote_ind - 1] == "\\": + quote_ind = val[:quote_ind - 1].rfind(quot) return val[quote_ind:] def formatErr(self, err): exctype, value, tb = err return ''.join(traceback.format_exception(exctype, value, tb)) - def getTestName(self, test): + def getTestName(self, test, is_subtest=False): + if is_subtest: + test_name = self.getTestName(test.test_case) + return "{} {}".format(test_name, test._subDescription()) if hasattr(test, '_testMethodName'): if test._testMethodName == "runTest": return str(test) @@ -95,10 +103,13 @@ class TeamcityTestResult(TestResult): TestResult.addSuccess(self, test) def addError(self, test, err): + self.init_suite(test) + self.current_failed = True TestResult.addError(self, test, err) err = self._exc_info_to_string(err, test) + self.messages.testStarted(self.getTestName(test)) self.messages.testError(self.getTestName(test), message='Error', details=err) @@ -108,6 +119,8 @@ class TeamcityTestResult(TestResult): return error_value.split('assert')[-1].strip() def addFailure(self, test, err): + self.init_suite(test) + self.current_failed = True TestResult.addFailure(self, test, err) error_value = smart_str(err[1]) @@ -119,7 +132,7 @@ class TeamcityTestResult(TestResult): self_find_second = self.find_second(error_value) quotes = ["'", '"'] if (self_find_first[0] == self_find_first[-1] and self_find_first[0] in quotes and - self_find_second[0] == self_find_second[-1] and self_find_second[0] in quotes): + self_find_second[0] == self_find_second[-1] and self_find_second[0] in quotes): # let's unescape strings to show sexy multiline diff in PyCharm. # By default all caret return chars are escaped by testing framework first = self._unescape(self_find_first) @@ -128,10 +141,13 @@ class TeamcityTestResult(TestResult): first = second = "" err = self._exc_info_to_string(err, test) + self.messages.testStarted(self.getTestName(test)) self.messages.testFailed(self.getTestName(test), message='Failure', details=err, expected=first, actual=second) def addSkip(self, test, reason): + self.init_suite(test) + self.current_failed = True self.messages.testIgnored(self.getTestName(test), message=reason) def __getSuite(self, test): @@ -149,10 +165,10 @@ class TeamcityTestResult(TestResult): try: source_file = inspect.getsourcefile(test.__class__) if source_file: - source_dir_splitted = source_file.split("/")[:-1] - source_dir = "/".join(source_dir_splitted) + "/" + source_dir_splitted = source_file.split("/")[:-1] + source_dir = "/".join(source_dir_splitted) + "/" else: - source_dir = "" + source_dir = "" except TypeError: source_dir = "" @@ -163,20 +179,52 @@ class TeamcityTestResult(TestResult): return (suite, location, suite_location) def startTest(self, test): + self.current_failed = False + setattr(test, "startTime", datetime.datetime.now()) + + def init_suite(self, test): suite, location, suite_location = self.__getSuite(test) if suite != self.current_suite: if self.current_suite: self.messages.testSuiteFinished(self.current_suite) self.current_suite = suite self.messages.testSuiteStarted(self.current_suite, location=suite_location) - setattr(test, "startTime", datetime.datetime.now()) - self.messages.testStarted(self.getTestName(test), location=location) + return location def stopTest(self, test): start = getattr(test, "startTime", datetime.datetime.now()) d = datetime.datetime.now() - start - duration=d.microseconds / 1000 + d.seconds * 1000 + d.days * 86400000 - self.messages.testFinished(self.getTestName(test), duration=int(duration)) + duration = d.microseconds / 1000 + d.seconds * 1000 + d.days * 86400000 + if not self.subtest_suite: + if not self.current_failed: + location = self.init_suite(test) + self.messages.testStarted(self.getTestName(test), location=location) + self.messages.testFinished(self.getTestName(test), duration=int(duration)) + else: + self.messages.testSuiteFinished(self.subtest_suite) + self.subtest_suite = None + + + def addSubTest(self, test, subtest, err): + suite_name = self.getTestName(test) # + " (subTests)" + if not self.subtest_suite: + self.subtest_suite = suite_name + self.messages.testSuiteStarted(self.subtest_suite) + else: + if suite_name != self.subtest_suite: + self.messages.testSuiteFinished(self.subtest_suite) + self.subtest_suite = suite_name + self.messages.testSuiteStarted(self.subtest_suite) + + name = self.getTestName(subtest, True) + if err is not None: + error = self._exc_info_to_string(err, test) + self.messages.testStarted(name) + self.messages.testFailed(name, message='Failure', details=error) + else: + self.messages.testStarted(name) + self.messages.testFinished(name) + def endLastSuite(self): if self.current_suite: @@ -187,6 +235,7 @@ class TeamcityTestResult(TestResult): # do not use text.decode('string_escape'), it leads to problems with different string encodings given return text.replace("\\n", "\n") + class TeamcityTestRunner(object): def __init__(self, stream=sys.stdout): self.stream = stream |