import sys import traceback import datetime import unittest from tcmessages import TeamcityServiceMessages from tcunittest import strclass from tcunittest import TeamcityTestResult try: from nose.util import isclass # backwards compat from nose.config import Config from nose.result import TextTestResult from nose import SkipTest from nose.plugins.errorclass import ErrorClassPlugin except (Exception, ): e = sys.exc_info()[1] raise NameError( "Something went wrong, do you have nosetest installed? I got this error: %s" % e) class TeamcityPlugin(ErrorClassPlugin, TextTestResult, TeamcityTestResult): """ TeamcityTest plugin for nose tests """ name = "TeamcityPlugin" enabled = True def __init__(self, stream=sys.stderr, descriptions=None, verbosity=1, config=None, errorClasses=None): super(TeamcityPlugin, self).__init__() if errorClasses is None: errorClasses = {} self.errorClasses = errorClasses if config is None: config = Config() self.config = config self.output = stream self.messages = TeamcityServiceMessages(self.output, prepend_linebreak=True) self.messages.testMatrixEntered() self.current_suite = None TextTestResult.__init__(self, stream, descriptions, verbosity, config, errorClasses) TeamcityTestResult.__init__(self, stream) def configure(self, options, conf): if not self.can_configure: return self.conf = conf def addError(self, test, err): exctype, value, tb = err err = self.formatErr(err) if exctype == SkipTest: self.messages.testIgnored(self.getTestName(test), message='Skip') else: self.messages.testError(self.getTestName(test), message='Error', details=err) def formatErr(self, err): exctype, value, tb = err if isinstance(value, str): try: value = exctype(value) except TypeError: pass return ''.join(traceback.format_exception(exctype, value, tb)) def is_gen(self, test): if hasattr(test, "test") and hasattr(test.test, "descriptor"): if test.test.descriptor is not None: return True return False def getTestName(self, test): if hasattr(test, "error_context"): return test.error_context test_name_full = str(test) if self.is_gen(test): return test_name_full ind_1 = test_name_full.rfind('(') if ind_1 != -1: return test_name_full[:ind_1] ind = test_name_full.rfind('.') if ind != -1: return test_name_full[test_name_full.rfind(".") + 1:] return test_name_full def addFailure(self, test, err): err = self.formatErr(err) self.messages.testFailed(self.getTestName(test), message='Failure', details=err) def addSkip(self, test, reason): self.messages.testIgnored(self.getTestName(test), message=reason) def __getSuite(self, test): if hasattr(test, "suite"): suite = strclass(test.suite) suite_location = test.suite.location location = test.suite.abs_location if hasattr(test, "lineno"): location = location + ":" + str(test.lineno) else: location = location + ":" + str(test.test.lineno) else: suite = strclass(test.__class__) suite_location = "python_nosetestid://" + suite try: from nose.util import func_lineno if hasattr(test.test, "descriptor") and test.test.descriptor: suite_location = "file://" + self.test_address( test.test.descriptor) location = suite_location + ":" + str( func_lineno(test.test.descriptor)) else: suite_location = "file://" + self.test_address( test.test.test) location = "file://" + self.test_address( test.test.test) + ":" + str(func_lineno(test.test.test)) except: test_id = test.id() suite_id = test_id[:test_id.rfind(".")] suite_location = "python_nosetestid://" + str(suite_id) location = "python_nosetestid://" + str(test_id) return (location, suite_location) def test_address(self, test): if hasattr(test, "address"): return test.address()[0] t = type(test) file = None import types, os if (t == types.FunctionType or issubclass(t, type) or t == type or isclass(test)): module = getattr(test, '__module__', None) if module is not None: m = sys.modules[module] file = getattr(m, '__file__', None) if file is not None: file = os.path.abspath(file) if file.endswith("pyc"): file = file[:-1] return file raise TypeError("I don't know what %s is (%s)" % (test, t)) def getSuiteName(self, test): test_name_full = str(test) if self.is_gen(test): ind_1 = test_name_full.rfind('(') if ind_1 != -1: ind = test_name_full.rfind('.') if ind != -1: return test_name_full[:test_name_full.rfind(".")] ind_1 = test_name_full.rfind('(') if ind_1 != -1: return test_name_full[ind_1 + 1: -1] ind = test_name_full.rfind('.') if ind != -1: return test_name_full[:test_name_full.rfind(".")] return test_name_full def startTest(self, test): location, suite_location = self.__getSuite(test) suite = self.getSuiteName(test) if suite != self.current_suite: if self.current_suite: self.messages.testSuiteFinished(self.current_suite) self.current_suite = suite self.messages.testSuiteStarted(self.current_suite, location=suite_location) setattr(test, "startTime", datetime.datetime.now()) self.messages.testStarted(self.getTestName(test), location=location) def stopTest(self, test): start = getattr(test, "startTime", datetime.datetime.now()) d = datetime.datetime.now() - start duration = d.microseconds / 1000 + d.seconds * 1000 + d.days * 86400000 self.messages.testFinished(self.getTestName(test), duration=int(duration)) def finalize(self, result): if self.current_suite: self.messages.testSuiteFinished(self.current_suite) self.current_suite = None class TeamcityNoseRunner(unittest.TextTestRunner): """Test runner that supports teamcity output """ def __init__(self, stream=sys.stdout, descriptions=1, verbosity=1, config=None): if config is None: config = Config() self.config = config unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity) def _makeResult(self): return TeamcityPlugin(self.stream, self.descriptions, self.verbosity, self.config) def run(self, test): """Overrides to provide plugin hooks and defer all output to the test result class. """ #for 2.5 compat plugins = self.config.plugins plugins.configure(self.config.options, self.config) plugins.begin() wrapper = plugins.prepareTest(test) if wrapper is not None: test = wrapper # plugins can decorate or capture the output stream wrapped = self.config.plugins.setOutputStream(self.stream) if wrapped is not None: self.stream = wrapped result = self._makeResult() test(result) result.endLastSuite() plugins.finalize(result) return result