import traceback, sys from unittest import TestResult import datetime from tcmessages import TeamcityServiceMessages PYTHON_VERSION_MAJOR = sys.version_info[0] def strclass(cls): if not cls.__name__: return cls.__module__ return "%s.%s" % (cls.__module__, cls.__name__) def smart_str(s): encoding = 'utf-8' errors = 'strict' if PYTHON_VERSION_MAJOR < 3: is_string = isinstance(s, basestring) else: is_string = isinstance(s, str) if not is_string: try: return str(s) except UnicodeEncodeError: if isinstance(s, Exception): # An Exception subclass containing non-ASCII data that doesn't # know how to print itself properly. We shouldn't raise a # further exception. return ' '.join([smart_str(arg) for arg in s]) return unicode(s).encode(encoding, errors) elif isinstance(s, unicode): return s.encode(encoding, errors) else: return s class TeamcityTestResult(TestResult): def __init__(self, stream=sys.stdout, *args, **kwargs): TestResult.__init__(self) for arg, value in kwargs.items(): setattr(self, arg, value) self.output = stream self.messages = TeamcityServiceMessages(self.output, prepend_linebreak=True) self.messages.testMatrixEntered() self.current_failed = False self.current_suite = None self.subtest_suite = None def find_first(self, val): quot = val[0] count = 1 quote_ind = val[count:].find(quot) while quote_ind != -1 and val[count + quote_ind - 1] == "\\": count = count + quote_ind + 1 quote_ind = val[count:].find(quot) return val[0:quote_ind + count + 1] def find_second(self, val): val_index = val.find("!=") if val_index != -1: count = 1 val = val[val_index + 2:].strip() quot = val[0] quote_ind = val[count:].find(quot) while quote_ind != -1 and val[count + quote_ind - 1] == "\\": count = count + quote_ind + 1 quote_ind = val[count:].find(quot) return val[0:quote_ind + count + 1] else: quot = val[-1] quote_ind = val[:len(val) - 1].rfind(quot) while quote_ind != -1 and val[quote_ind - 1] == "\\": quote_ind = val[:quote_ind - 1].rfind(quot) return val[quote_ind:] def formatErr(self, err): exctype, value, tb = err return ''.join(traceback.format_exception(exctype, value, tb)) def getTestName(self, test, is_subtest=False): if is_subtest: test_name = self.getTestName(test.test_case) return "{} {}".format(test_name, test._subDescription()) if hasattr(test, '_testMethodName'): if test._testMethodName == "runTest": return str(test) return test._testMethodName else: test_name = str(test) whitespace_index = test_name.index(" ") if whitespace_index != -1: test_name = test_name[:whitespace_index] return test_name def getTestId(self, test): return test.id def addSuccess(self, test): TestResult.addSuccess(self, test) def addError(self, test, err): self.init_suite(test) self.current_failed = True TestResult.addError(self, test, err) err = self._exc_info_to_string(err, test) self.messages.testStarted(self.getTestName(test)) self.messages.testError(self.getTestName(test), message='Error', details=err) def find_error_value(self, err): error_value = traceback.extract_tb(err) error_value = error_value[-1][-1] return error_value.split('assert')[-1].strip() def addFailure(self, test, err): self.init_suite(test) self.current_failed = True TestResult.addFailure(self, test, err) error_value = smart_str(err[1]) if not len(error_value): # means it's test function and we have to extract value from traceback error_value = self.find_error_value(err[2]) self_find_first = self.find_first(error_value) self_find_second = self.find_second(error_value) quotes = ["'", '"'] if (self_find_first[0] == self_find_first[-1] and self_find_first[0] in quotes and self_find_second[0] == self_find_second[-1] and self_find_second[0] in quotes): # let's unescape strings to show sexy multiline diff in PyCharm. # By default all caret return chars are escaped by testing framework first = self._unescape(self_find_first) second = self._unescape(self_find_second) else: first = second = "" err = self._exc_info_to_string(err, test) self.messages.testStarted(self.getTestName(test)) self.messages.testFailed(self.getTestName(test), message='Failure', details=err, expected=first, actual=second) def addSkip(self, test, reason): self.init_suite(test) self.current_failed = True self.messages.testIgnored(self.getTestName(test), message=reason) def __getSuite(self, test): if hasattr(test, "suite"): suite = strclass(test.suite) suite_location = test.suite.location location = test.suite.abs_location if hasattr(test, "lineno"): location = location + ":" + str(test.lineno) else: location = location + ":" + str(test.test.lineno) else: import inspect try: source_file = inspect.getsourcefile(test.__class__) if source_file: source_dir_splitted = source_file.split("/")[:-1] source_dir = "/".join(source_dir_splitted) + "/" else: source_dir = "" except TypeError: source_dir = "" suite = strclass(test.__class__) suite_location = "python_uttestid://" + source_dir + suite location = "python_uttestid://" + source_dir + str(test.id()) return (suite, location, suite_location) def startTest(self, test): self.current_failed = False setattr(test, "startTime", datetime.datetime.now()) def init_suite(self, test): suite, location, suite_location = self.__getSuite(test) if suite != self.current_suite: if self.current_suite: self.messages.testSuiteFinished(self.current_suite) self.current_suite = suite self.messages.testSuiteStarted(self.current_suite, location=suite_location) return location def stopTest(self, test): start = getattr(test, "startTime", datetime.datetime.now()) d = datetime.datetime.now() - start duration = d.microseconds / 1000 + d.seconds * 1000 + d.days * 86400000 if not self.subtest_suite: if not self.current_failed: location = self.init_suite(test) self.messages.testStarted(self.getTestName(test), location=location) self.messages.testFinished(self.getTestName(test), duration=int(duration)) else: self.messages.testSuiteFinished(self.subtest_suite) self.subtest_suite = None def addSubTest(self, test, subtest, err): suite_name = self.getTestName(test) # + " (subTests)" if not self.subtest_suite: self.subtest_suite = suite_name self.messages.testSuiteStarted(self.subtest_suite) else: if suite_name != self.subtest_suite: self.messages.testSuiteFinished(self.subtest_suite) self.subtest_suite = suite_name self.messages.testSuiteStarted(self.subtest_suite) name = self.getTestName(subtest, True) if err is not None: error = self._exc_info_to_string(err, test) self.messages.testStarted(name) self.messages.testFailed(name, message='Failure', details=error) else: self.messages.testStarted(name) self.messages.testFinished(name) def endLastSuite(self): if self.current_suite: self.messages.testSuiteFinished(self.current_suite) self.current_suite = None def _unescape(self, text): # do not use text.decode('string_escape'), it leads to problems with different string encodings given return text.replace("\\n", "\n") class TeamcityTestRunner(object): def __init__(self, stream=sys.stdout): self.stream = stream def _makeResult(self, **kwargs): return TeamcityTestResult(self.stream, **kwargs) def run(self, test, **kwargs): result = self._makeResult(**kwargs) result.messages.testCount(test.countTestCases()) test(result) result.endLastSuite() return result