summaryrefslogtreecommitdiff
path: root/python/helpers/pycharm
diff options
context:
space:
mode:
Diffstat (limited to 'python/helpers/pycharm')
-rw-r--r--python/helpers/pycharm/_bdd_utils.py201
-rw-r--r--python/helpers/pycharm/behave_runner.py242
-rw-r--r--python/helpers/pycharm/lettuce_runner.py158
-rw-r--r--python/helpers/pycharm/tcunittest.py87
4 files changed, 580 insertions, 108 deletions
diff --git a/python/helpers/pycharm/_bdd_utils.py b/python/helpers/pycharm/_bdd_utils.py
new file mode 100644
index 000000000000..0c92532b516c
--- /dev/null
+++ b/python/helpers/pycharm/_bdd_utils.py
@@ -0,0 +1,201 @@
+# coding=utf-8
+"""
+Tools for running BDD frameworks in python.
+You probably need to extend BddRunner (see its doc).
+
+You may also need "get_path_by_args" that gets folder (current or passed as first argument)
+"""
+import os
+import time
+import abc
+
+import tcmessages
+
+
+__author__ = 'Ilya.Kazakevich'
+
+
+def get_path_by_args(arguments):
+ """
+ :type arguments list
+ :param arguments: arguments (sys.argv)
+ :return: tuple (base_dir, what_to_run) where dir is current or first argument from argv, checking it exists
+ :rtype tuple of str
+ """
+ what_to_run = arguments[1] if len(arguments) > 1 else "."
+ base_dir = what_to_run
+ assert os.path.exists(what_to_run), "{} does not exist".format(what_to_run)
+
+ if os.path.isfile(what_to_run):
+ base_dir = os.path.dirname(what_to_run) # User may point to the file directly
+ return base_dir, what_to_run
+
+
+class BddRunner(object):
+ """
+ Extends this class, implement abstract methods and use its API to implement new BDD frameworks.
+ Call "run()" to launch it.
+ This class does the following:
+ * Gets features to run (using "_get_features_to_run()") and calculates steps in it
+ * Reports steps to Intellij or TC
+ * Calls "_run_tests()" where *you* should install all hooks you need into your BDD and use "self._" functions
+ to report tests and features. It actually wraps tcmessages but adds some stuff like duration count etc
+ :param base_dir:
+ """
+ __metaclass__ = abc.ABCMeta
+
+ def __init__(self, base_dir):
+ """
+ :type base_dir str
+ :param base_dir base directory of your project
+ """
+ super(BddRunner, self).__init__()
+ self.tc_messages = tcmessages.TeamcityServiceMessages()
+ """
+ tcmessages TeamCity/Intellij test API. See TeamcityServiceMessages
+ """
+ self.__base_dir = base_dir
+ self.__last_test_start_time = None # TODO: Doc when use
+ self.__last_test_name = None
+
+ def run(self):
+ """"
+ Runs runner. To be called right after constructor.
+ """
+ self.tc_messages.testCount(self._get_number_of_tests())
+ self.tc_messages.testMatrixEntered()
+ self._run_tests()
+
+ def __gen_location(self, location):
+ """
+ Generates location in format, supported by tcmessages
+ :param location object with "file" (relative to base_dir) and "line" fields.
+ :return: location in format file:line (as supported in tcmessages)
+ """
+ my_file = str(location.file).lstrip("/\\")
+ return "file:///{}:{}".format(os.path.normpath(os.path.join(self.__base_dir, my_file)), location.line)
+
+ def _test_undefined(self, test_name, location):
+ """
+ Mark test as undefined
+ :param test_name: name of test
+ :type test_name str
+ :param location its location
+
+ """
+ if test_name != self.__last_test_name:
+ self._test_started(test_name, location)
+ self._test_failed(test_name, message="Test undefined", details="Please define test")
+
+ def _test_skipped(self, test_name, reason, location):
+ """
+ Mark test as skipped
+ :param test_name: name of test
+ :param reason: why test was skipped
+ :type reason str
+ :type test_name str
+ :param location its location
+
+ """
+ if test_name != self.__last_test_name:
+ self._test_started(test_name, location)
+ self.tc_messages.testIgnored(test_name, "Skipped: {}".format(reason))
+ self.__last_test_name = None
+ pass
+
+ def _test_failed(self, name, message, details):
+ """
+ Report test failure
+ :param name: test name
+ :type name str
+ :param message: failure message
+ :type message str
+ :param details: failure details (probably stacktrace)
+ :type details str
+ """
+ self.tc_messages.testFailed(name, message=message, details=details)
+ self.__last_test_name = None
+
+ def _test_passed(self, name, duration=None):
+ """
+ Reports test passed
+ :param name: test name
+ :type name str
+ :param duration: time (in seconds) test took. Pass None if you do not know (we'll try to calculate it)
+ :type duration int
+ :return:
+ """
+ duration_to_report = duration
+ if self.__last_test_start_time and not duration: # And not provided
+ duration_to_report = int(time.time() - self.__last_test_start_time)
+ self.tc_messages.testFinished(name, duration=int(duration_to_report))
+ self.__last_test_start_time = None
+ self.__last_test_name = None
+
+ def _test_started(self, name, location):
+ """
+ Reports test launched
+ :param name: test name
+ :param location object with "file" (relative to base_dir) and "line" fields.
+ :type name str
+ """
+ self.__last_test_start_time = time.time()
+ self.__last_test_name = name
+ self.tc_messages.testStarted(name, self.__gen_location(location))
+
+ def _feature_or_scenario(self, is_started, name, location):
+ """
+ Reports feature or scenario launched or stopped
+ :param is_started: started or finished?
+ :type is_started bool
+ :param name: scenario or feature name
+ :param location object with "file" (relative to base_dir) and "line" fields.
+ """
+ if is_started:
+ self.tc_messages.testSuiteStarted(name, self.__gen_location(location))
+ else:
+ self.tc_messages.testSuiteFinished(name)
+
+ def _background(self, is_started, location):
+ """
+ Reports background or stopped
+ :param is_started: started or finished?
+ :type is_started bool
+ :param location object with "file" (relative to base_dir) and "line" fields.
+ """
+ self._feature_or_scenario(is_started, "Background", location)
+
+ def _get_number_of_tests(self):
+ """"
+ Gets number of tests using "_get_features_to_run()" to obtain number of features to calculate.
+ Supports backgrounds as well.
+ :return number of steps
+ :rtype int
+ """
+ num_of_steps = 0
+ for feature in self._get_features_to_run():
+ if feature.background:
+ num_of_steps += len(feature.background.steps) * len(feature.scenarios)
+ for scenario in feature.scenarios:
+ num_of_steps += len(scenario.steps)
+ return num_of_steps
+
+ @abc.abstractmethod
+ def _get_features_to_run(self):
+ """
+ Implement it! Return list of features to run. Each "feature" should have "scenarios".
+ Each "scenario" should have "steps". Each "feature" may have "background" and each "background" should have
+ "steps". Duck typing.
+ :rtype list
+ :returns list of features
+ """
+ return []
+
+ @abc.abstractmethod
+ def _run_tests(self):
+ """
+ Implement it! It should launch tests using your BDD. Use "self._" functions to report results.
+ """
+ pass
+
+
diff --git a/python/helpers/pycharm/behave_runner.py b/python/helpers/pycharm/behave_runner.py
new file mode 100644
index 000000000000..4a1b2f6557c5
--- /dev/null
+++ b/python/helpers/pycharm/behave_runner.py
@@ -0,0 +1,242 @@
+# coding=utf-8
+"""
+Behave BDD runner.
+*FIRST* param now: folder to search "features" for.
+Each "features" folder should have features and "steps" subdir.
+
+Other args are tag expressionsin format (--tags=.. --tags=..).
+See https://pythonhosted.org/behave/behave.html#tag-expression
+"""
+import functools
+import sys
+import os
+import traceback
+
+from behave.formatter.base import Formatter
+from behave.model import Step, ScenarioOutline, Feature, Scenario
+from behave.tag_expression import TagExpression
+
+import _bdd_utils
+
+
+_MAX_STEPS_SEARCH_FEATURES = 5000 # Do not look for features in folder that has more that this number of children
+_FEATURES_FOLDER = 'features' # "features" folder name.
+
+__author__ = 'Ilya.Kazakevich'
+
+from behave import configuration, runner
+from behave.formatter import formatters
+
+
+def _get_dirs_to_run(base_dir_to_search):
+ """
+ Searches for "features" dirs in some base_dir
+ :return: list of feature dirs to run
+ :rtype: list
+ :param base_dir_to_search root directory to search (should not have too many children!)
+ :type base_dir_to_search str
+
+ """
+ result = set()
+ for (step, (folder, sub_folders, files)) in enumerate(os.walk(base_dir_to_search)):
+ if os.path.basename(folder) == _FEATURES_FOLDER and os.path.isdir(folder):
+ result.add(os.path.abspath(folder))
+ if step == _MAX_STEPS_SEARCH_FEATURES: # Guard
+ err = "Folder {} is too deep to find any features folder. Please provider concrete folder".format(
+ base_dir_to_search)
+ raise Exception(err)
+ return list(result)
+
+
+def _merge_hooks_wrapper(*hooks):
+ """
+ Creates wrapper that runs provided behave hooks sequentally
+ :param hooks: hooks to run
+ :return: wrapper
+ """
+ # TODO: Wheel reinvented!!!!
+ def wrapper(*args, **kwargs):
+ for hook in hooks:
+ hook(*args, **kwargs)
+
+ return wrapper
+
+
+class _RunnerWrapper(runner.Runner):
+ """
+ Wrapper around behave native wrapper. Has nothing todo with BddRunner!
+ We need it to support dry runs (to fetch data from scenarios) and hooks api
+ """
+
+ def __init__(self, config, hooks):
+ """
+ :type config configuration.Configuration
+ :param config behave configuration
+ :type hooks dict
+ :param hooks hooks in format "before_scenario" => f(context, scenario) to load after/before hooks, provided by user
+ """
+ super(_RunnerWrapper, self).__init__(config)
+ self.dry_run = False
+ """
+ Does not run tests (only fetches "self.features") if true. Runs tests otherwise.
+ """
+ self.__hooks = hooks
+
+ def load_hooks(self, filename='environment.py'):
+ """
+ Overrides parent "load_hooks" to add "self.__hooks"
+ :param filename: env. file name
+ """
+ super(_RunnerWrapper, self).load_hooks(filename)
+ for (hook_name, hook) in self.__hooks.items():
+ hook_to_add = hook
+ if hook_name in self.hooks:
+ user_hook = self.hooks[hook_name]
+ if hook_name.startswith("before"):
+ user_and_custom_hook = [user_hook, hook]
+ else:
+ user_and_custom_hook = [hook, user_hook]
+ hook_to_add = _merge_hooks_wrapper(*user_and_custom_hook)
+ self.hooks[hook_name] = hook_to_add
+
+ def run_model(self, features=None):
+ """
+ Overrides parent method to stop (do nothing) in case of "dry_run"
+ :param features: features to run
+ :return:
+ """
+ if self.dry_run: # To stop further execution
+ return
+ return super(_RunnerWrapper, self).run_model(features)
+
+ def clean(self):
+ """
+ Cleans runner after dry run (clears hooks, features etc). To be called before real run!
+ """
+ self.dry_run = False
+ self.hooks.clear()
+ self.features = []
+
+
+class _BehaveRunner(_bdd_utils.BddRunner):
+ """
+ BddRunner for behave
+ """
+
+
+ def __process_hook(self, is_started, context, element):
+ """
+ Hook to be installed. Reports steps, features etc.
+ :param is_started true if test/feature/scenario is started
+ :type is_started bool
+ :param context behave context
+ :type context behave.runner.Context
+ :param element feature/suite/step
+ """
+ element.location.file = element.location.filename # To preserve _bdd_utils contract
+ if isinstance(element, Step):
+ # Process step
+ if is_started:
+ self._test_started(element.name, element.location)
+ elif element.status == 'passed':
+ self._test_passed(element.name, element.duration)
+ elif element.status == 'failed':
+ try:
+ trace = traceback.format_exc()
+ except Exception:
+ trace = "".join(traceback.format_tb(element.exc_traceback))
+ self._test_failed(element.name, element.error_message, trace)
+ elif element.status == 'undefined':
+ self._test_undefined(element.name, element.location)
+ else:
+ self._test_skipped(element.name, element.status, element.location)
+ elif not is_started and isinstance(element, Scenario) and element.status == 'failed':
+ # To process scenarios with undefined/skipped tests
+ for step in element.steps:
+ assert isinstance(step, Step), step
+ if step.status not in ['passed', 'failed']: # Something strange, probably skipped or undefined
+ self.__process_hook(False, context, step)
+ self._feature_or_scenario(is_started, element.name, element.location)
+ elif isinstance(element, ScenarioOutline):
+ self._feature_or_scenario(is_started, str(element.examples), element.location)
+ else:
+ self._feature_or_scenario(is_started, element.name, element.location)
+
+ def __init__(self, config, base_dir):
+ """
+ :type config configuration.Configuration
+ """
+ super(_BehaveRunner, self).__init__(base_dir)
+ self.__config = config
+ # Install hooks
+ self.__real_runner = _RunnerWrapper(config, {
+ "before_feature": functools.partial(self.__process_hook, True),
+ "after_feature": functools.partial(self.__process_hook, False),
+ "before_scenario": functools.partial(self.__process_hook, True),
+ "after_scenario": functools.partial(self.__process_hook, False),
+ "before_step": functools.partial(self.__process_hook, True),
+ "after_step": functools.partial(self.__process_hook, False)
+ })
+
+ def _run_tests(self):
+ self.__real_runner.run()
+
+
+ def __filter_scenarios_by_tag(self, scenario):
+ """
+ Filters out scenarios that should be skipped by tags
+ :param scenario scenario to check
+ :return true if should pass
+ """
+ assert isinstance(scenario, Scenario), scenario
+ expected_tags = self.__config.tags
+ if not expected_tags:
+ return True # No tags are required
+ return isinstance(expected_tags, TagExpression) and expected_tags.check(scenario.tags)
+
+
+ def _get_features_to_run(self):
+ self.__real_runner.dry_run = True
+ self.__real_runner.run()
+ features_to_run = self.__real_runner.features
+ self.__real_runner.clean() # To make sure nothing left after dry run
+
+ # Change outline scenario skeletons with real scenarios
+ for feature in features_to_run:
+ assert isinstance(feature, Feature), feature
+ scenarios = []
+ for scenario in feature.scenarios:
+ if isinstance(scenario, ScenarioOutline):
+ scenarios.extend(scenario.scenarios)
+ else:
+ scenarios.append(scenario)
+ feature.scenarios = filter(self.__filter_scenarios_by_tag, scenarios)
+
+ return features_to_run
+
+
+if __name__ == "__main__":
+ # TODO: support all other params instead
+
+ class _Null(Formatter):
+ """
+ Null formater to prevent stdout output
+ """
+ pass
+
+ command_args = list(filter(None, sys.argv[1:]))
+ my_config = configuration.Configuration(command_args=command_args)
+ formatters.register_as(_Null, "com.intellij.python.null")
+ my_config.format = ["com.intellij.python.null"] # To prevent output to stdout
+ my_config.reporters = [] # To prevent summary to stdout
+ my_config.stdout_capture = False # For test output
+ my_config.stderr_capture = False # For test output
+ (base_dir, what_to_run) = _bdd_utils.get_path_by_args(sys.argv)
+ if not my_config.paths: # No path provided, trying to load dit manually
+ if os.path.isfile(what_to_run): # File is provided, load it
+ my_config.paths = [what_to_run]
+ else: # Dir is provided, find subdirs ro run
+ my_config.paths = _get_dirs_to_run(base_dir)
+ _BehaveRunner(my_config, base_dir).run()
+
+
diff --git a/python/helpers/pycharm/lettuce_runner.py b/python/helpers/pycharm/lettuce_runner.py
index 6aaa566df719..3cd112540e5f 100644
--- a/python/helpers/pycharm/lettuce_runner.py
+++ b/python/helpers/pycharm/lettuce_runner.py
@@ -1,132 +1,112 @@
# coding=utf-8
"""
BDD lettuce framework runner
+TODO: Support other params (like tags) as well.
+Supports only 1 param now: folder to search "features" for.
"""
+import _bdd_utils
+
__author__ = 'Ilya.Kazakevich'
-import os
from lettuce.exceptions import ReasonToFail
-import time
import sys
-import tcmessages
import lettuce
from lettuce import core
-# Error message about unsupported outlines
-_NO_OUTLINE_ERROR = "Outline scenarios are not supported due to https://github.com/gabrielfalcao/lettuce/issues/451"
-
-
-class LettuceRunner(object):
+class _LettuceRunner(_bdd_utils.BddRunner):
"""
- TODO: Runs lettuce
+ Lettuce runner (BddRunner for lettuce)
"""
- def __init__(self, base_dir):
+ def __init__(self, base_dir, what_to_run):
"""
+
:param base_dir base directory to run tests in
:type base_dir: str
-
- """
- self.base_dir = base_dir
- self.runner = lettuce.Runner(base_dir)
- self.messages = tcmessages.TeamcityServiceMessages()
- self.test_start_time = None
-
- def report_tests(self):
- """
- :returns : number of tests
- :rtype : int
- """
- result = 0
- for feature_file in self.runner.loader.find_feature_files():
- feature = core.Feature.from_file(feature_file)
- for scenario in feature.scenarios:
- assert isinstance(scenario, core.Scenario), scenario
- if not scenario.outlines:
- result += len(scenario.steps)
- self.messages.testCount(result)
-
- def report_scenario_started(self, scenario):
+ :param what_to_run folder or file to run
+ :type what_to_run str
"""
- Reports scenario launched
- :type scenario core.Scenario
- :param scenario: scenario
- """
- if scenario.outlines:
- self.messages.testIgnored(scenario.name,
- _NO_OUTLINE_ERROR)
- scenario.steps = [] # Clear to prevent running. TODO: Fix when this issue fixed
- scenario.background = None # TODO: undocumented
- return
- self.report_suite(True, scenario.name, scenario.described_at)
+ super(_LettuceRunner, self).__init__(base_dir)
+ self.__runner = lettuce.Runner(what_to_run)
- def report_suite(self, is_start, name, described_at):
- """
- Reports some suite (scenario, feature, background etc) is started or stopped
- :param is_start: started or not
- :param name: suite name
- :param described_at: where it is described (file, line)
- :return:
- """
- if is_start:
- self.messages.testSuiteStarted(name, self._gen_location(described_at))
- else:
- self.messages.testSuiteFinished(name)
+ def _get_features_to_run(self):
+ super(_LettuceRunner, self)._get_features_to_run()
+ if self.__runner.single_feature: # We need to run one and only one feature
+ return [core.Feature.from_file(self.__runner.single_feature)]
- def report_step(self, is_start, step):
+ # Find all features in dir
+ features = []
+ for feature_file in self.__runner.loader.find_feature_files():
+ feature = core.Feature.from_file(feature_file)
+ assert isinstance(feature, core.Feature), feature
+ # TODO: cut out due to https://github.com/gabrielfalcao/lettuce/issues/451 Fix when this issue fixed
+ feature.scenarios = filter(lambda s: not s.outlines, feature.scenarios)
+ if feature.scenarios:
+ features.append(feature)
+ return features
+
+ def _run_tests(self):
+ super(_LettuceRunner, self)._run_tests()
+ self.__install_hooks()
+ self.__runner.run()
+
+ def __step(self, is_started, step):
"""
Reports step start / stop
- :param is_start: true if step started
:type step core.Step
:param step: step
"""
test_name = step.sentence
- if is_start:
- self.test_start_time = time.time()
- self.messages.testStarted(test_name, self._gen_location(step.described_at))
+ if is_started:
+ self._test_started(test_name, step.described_at)
elif step.passed:
- duration = 0
- if self.test_start_time:
- duration = long(time.time() - self.test_start_time)
- self.messages.testFinished(test_name, duration=duration)
- self.test_start_time = None
+ self._test_passed(test_name)
elif step.failed:
reason = step.why
assert isinstance(reason, ReasonToFail), reason
- self.messages.testFailed(test_name, message=reason.exception, details=reason.traceback)
-
- def _gen_location(self, description):
- """
- :param description: "described_at" (file, line)
- :return: location in format file:line by "described_at"
- """
- return "file:///{}/{}:{}".format(self.base_dir, description.file, description.line)
+ self._test_failed(test_name, message=reason.exception, details=reason.traceback)
+ elif step.has_definition:
+ self._test_skipped(test_name, "In lettuce, we do know the reason", step.described_at)
+ else:
+ self._test_undefined(test_name, step.described_at)
- def run(self):
+ def __install_hooks(self):
"""
- Launches runner
+ Installs required hooks
"""
- self.report_tests()
- self.messages.testMatrixEntered()
- lettuce.before.each_feature(lambda f: self.report_suite(True, f.name, f.described_at))
- lettuce.after.each_feature(lambda f: self.report_suite(False, f.name, f.described_at))
+ # Install hooks
+ lettuce.before.each_feature(
+ lambda f: self._feature_or_scenario(True, f.name, f.described_at))
+ lettuce.after.each_feature(
+ lambda f: self._feature_or_scenario(False, f.name, f.described_at))
- lettuce.before.each_scenario(lambda s: self.report_scenario_started(s))
- lettuce.after.each_scenario(lambda s: self.report_suite(False, s.name, s.described_at))
+ lettuce.before.each_scenario(
+ lambda s: self.__scenario(True, s))
+ lettuce.after.each_scenario(
+ lambda s: self.__scenario(False, s))
lettuce.before.each_background(
- lambda b, *args: self.report_suite(True, "Scenario background", b.feature.described_at))
+ lambda b, *args: self._background(True, b.feature.described_at))
lettuce.after.each_background(
- lambda b, *args: self.report_suite(False, "Scenario background", b.feature.described_at))
+ lambda b, *args: self._background(False, b.feature.described_at))
- lettuce.before.each_step(lambda s: self.report_step(True, s))
- lettuce.after.each_step(lambda s: self.report_step(False, s))
+ lettuce.before.each_step(lambda s: self.__step(True, s))
+ lettuce.after.each_step(lambda s: self.__step(False, s))
- self.runner.run()
+ def __scenario(self, is_started, scenario):
+ """
+ Reports scenario launched
+ :type scenario core.Scenario
+ :param scenario: scenario
+ """
+ if scenario.outlines:
+ scenario.steps = [] # Clear to prevent running. TODO: Fix when this issue fixed
+ scenario.background = None # TODO: undocumented
+ return
+ self._feature_or_scenario(is_started, scenario.name, scenario.described_at)
if __name__ == "__main__":
- path = sys.argv[1] if len(sys.argv) > 1 else "."
- assert os.path.exists(path), "{} does not exist".format(path)
- LettuceRunner(path).run() \ No newline at end of file
+ (base_dir, what_to_run) = _bdd_utils.get_path_by_args(sys.argv)
+ _LettuceRunner(base_dir, what_to_run).run() \ No newline at end of file
diff --git a/python/helpers/pycharm/tcunittest.py b/python/helpers/pycharm/tcunittest.py
index b6950c92a11f..99b30595a191 100644
--- a/python/helpers/pycharm/tcunittest.py
+++ b/python/helpers/pycharm/tcunittest.py
@@ -6,14 +6,16 @@ from tcmessages import TeamcityServiceMessages
PYTHON_VERSION_MAJOR = sys.version_info[0]
+
def strclass(cls):
if not cls.__name__:
return cls.__module__
return "%s.%s" % (cls.__module__, cls.__name__)
+
def smart_str(s):
- encoding='utf-8'
- errors='strict'
+ encoding = 'utf-8'
+ errors = 'strict'
if PYTHON_VERSION_MAJOR < 3:
is_string = isinstance(s, basestring)
else:
@@ -33,6 +35,7 @@ def smart_str(s):
else:
return s
+
class TeamcityTestResult(TestResult):
def __init__(self, stream=sys.stdout, *args, **kwargs):
TestResult.__init__(self)
@@ -41,42 +44,47 @@ class TeamcityTestResult(TestResult):
self.output = stream
self.messages = TeamcityServiceMessages(self.output, prepend_linebreak=True)
self.messages.testMatrixEntered()
+ self.current_failed = False
self.current_suite = None
+ self.subtest_suite = None
def find_first(self, val):
quot = val[0]
count = 1
quote_ind = val[count:].find(quot)
- while quote_ind != -1 and val[count+quote_ind-1] == "\\":
+ while quote_ind != -1 and val[count + quote_ind - 1] == "\\":
count = count + quote_ind + 1
quote_ind = val[count:].find(quot)
- return val[0:quote_ind+count+1]
+ return val[0:quote_ind + count + 1]
def find_second(self, val):
val_index = val.find("!=")
if val_index != -1:
count = 1
- val = val[val_index+2:].strip()
+ val = val[val_index + 2:].strip()
quot = val[0]
quote_ind = val[count:].find(quot)
- while quote_ind != -1 and val[count+quote_ind-1] == "\\":
+ while quote_ind != -1 and val[count + quote_ind - 1] == "\\":
count = count + quote_ind + 1
quote_ind = val[count:].find(quot)
- return val[0:quote_ind+count+1]
+ return val[0:quote_ind + count + 1]
else:
quot = val[-1]
- quote_ind = val[:len(val)-1].rfind(quot)
- while quote_ind != -1 and val[quote_ind-1] == "\\":
- quote_ind = val[:quote_ind-1].rfind(quot)
+ quote_ind = val[:len(val) - 1].rfind(quot)
+ while quote_ind != -1 and val[quote_ind - 1] == "\\":
+ quote_ind = val[:quote_ind - 1].rfind(quot)
return val[quote_ind:]
def formatErr(self, err):
exctype, value, tb = err
return ''.join(traceback.format_exception(exctype, value, tb))
- def getTestName(self, test):
+ def getTestName(self, test, is_subtest=False):
+ if is_subtest:
+ test_name = self.getTestName(test.test_case)
+ return "{} {}".format(test_name, test._subDescription())
if hasattr(test, '_testMethodName'):
if test._testMethodName == "runTest":
return str(test)
@@ -95,10 +103,13 @@ class TeamcityTestResult(TestResult):
TestResult.addSuccess(self, test)
def addError(self, test, err):
+ self.init_suite(test)
+ self.current_failed = True
TestResult.addError(self, test, err)
err = self._exc_info_to_string(err, test)
+ self.messages.testStarted(self.getTestName(test))
self.messages.testError(self.getTestName(test),
message='Error', details=err)
@@ -108,6 +119,8 @@ class TeamcityTestResult(TestResult):
return error_value.split('assert')[-1].strip()
def addFailure(self, test, err):
+ self.init_suite(test)
+ self.current_failed = True
TestResult.addFailure(self, test, err)
error_value = smart_str(err[1])
@@ -119,7 +132,7 @@ class TeamcityTestResult(TestResult):
self_find_second = self.find_second(error_value)
quotes = ["'", '"']
if (self_find_first[0] == self_find_first[-1] and self_find_first[0] in quotes and
- self_find_second[0] == self_find_second[-1] and self_find_second[0] in quotes):
+ self_find_second[0] == self_find_second[-1] and self_find_second[0] in quotes):
# let's unescape strings to show sexy multiline diff in PyCharm.
# By default all caret return chars are escaped by testing framework
first = self._unescape(self_find_first)
@@ -128,10 +141,13 @@ class TeamcityTestResult(TestResult):
first = second = ""
err = self._exc_info_to_string(err, test)
+ self.messages.testStarted(self.getTestName(test))
self.messages.testFailed(self.getTestName(test),
message='Failure', details=err, expected=first, actual=second)
def addSkip(self, test, reason):
+ self.init_suite(test)
+ self.current_failed = True
self.messages.testIgnored(self.getTestName(test), message=reason)
def __getSuite(self, test):
@@ -149,10 +165,10 @@ class TeamcityTestResult(TestResult):
try:
source_file = inspect.getsourcefile(test.__class__)
if source_file:
- source_dir_splitted = source_file.split("/")[:-1]
- source_dir = "/".join(source_dir_splitted) + "/"
+ source_dir_splitted = source_file.split("/")[:-1]
+ source_dir = "/".join(source_dir_splitted) + "/"
else:
- source_dir = ""
+ source_dir = ""
except TypeError:
source_dir = ""
@@ -163,20 +179,52 @@ class TeamcityTestResult(TestResult):
return (suite, location, suite_location)
def startTest(self, test):
+ self.current_failed = False
+ setattr(test, "startTime", datetime.datetime.now())
+
+ def init_suite(self, test):
suite, location, suite_location = self.__getSuite(test)
if suite != self.current_suite:
if self.current_suite:
self.messages.testSuiteFinished(self.current_suite)
self.current_suite = suite
self.messages.testSuiteStarted(self.current_suite, location=suite_location)
- setattr(test, "startTime", datetime.datetime.now())
- self.messages.testStarted(self.getTestName(test), location=location)
+ return location
def stopTest(self, test):
start = getattr(test, "startTime", datetime.datetime.now())
d = datetime.datetime.now() - start
- duration=d.microseconds / 1000 + d.seconds * 1000 + d.days * 86400000
- self.messages.testFinished(self.getTestName(test), duration=int(duration))
+ duration = d.microseconds / 1000 + d.seconds * 1000 + d.days * 86400000
+ if not self.subtest_suite:
+ if not self.current_failed:
+ location = self.init_suite(test)
+ self.messages.testStarted(self.getTestName(test), location=location)
+ self.messages.testFinished(self.getTestName(test), duration=int(duration))
+ else:
+ self.messages.testSuiteFinished(self.subtest_suite)
+ self.subtest_suite = None
+
+
+ def addSubTest(self, test, subtest, err):
+ suite_name = self.getTestName(test) # + " (subTests)"
+ if not self.subtest_suite:
+ self.subtest_suite = suite_name
+ self.messages.testSuiteStarted(self.subtest_suite)
+ else:
+ if suite_name != self.subtest_suite:
+ self.messages.testSuiteFinished(self.subtest_suite)
+ self.subtest_suite = suite_name
+ self.messages.testSuiteStarted(self.subtest_suite)
+
+ name = self.getTestName(subtest, True)
+ if err is not None:
+ error = self._exc_info_to_string(err, test)
+ self.messages.testStarted(name)
+ self.messages.testFailed(name, message='Failure', details=error)
+ else:
+ self.messages.testStarted(name)
+ self.messages.testFinished(name)
+
def endLastSuite(self):
if self.current_suite:
@@ -187,6 +235,7 @@ class TeamcityTestResult(TestResult):
# do not use text.decode('string_escape'), it leads to problems with different string encodings given
return text.replace("\\n", "\n")
+
class TeamcityTestRunner(object):
def __init__(self, stream=sys.stdout):
self.stream = stream