diff options
Diffstat (limited to 'python/helpers/pydev/pydev_runfiles.py')
-rw-r--r-- | python/helpers/pydev/pydev_runfiles.py | 831 |
1 files changed, 831 insertions, 0 deletions
diff --git a/python/helpers/pydev/pydev_runfiles.py b/python/helpers/pydev/pydev_runfiles.py new file mode 100644 index 000000000000..bb704db3f240 --- /dev/null +++ b/python/helpers/pydev/pydev_runfiles.py @@ -0,0 +1,831 @@ +from __future__ import nested_scopes + +import fnmatch +import os.path +from pydev_runfiles_coverage import StartCoverageSupport +import pydev_runfiles_unittest +from pydevd_constants import * #@UnusedWildImport +import re +import time +import unittest + + +#======================================================================================================================= +# Configuration +#======================================================================================================================= +class Configuration: + + def __init__( + self, + files_or_dirs='', + verbosity=2, + include_tests=None, + tests=None, + port=None, + files_to_tests=None, + jobs=1, + split_jobs='tests', + coverage_output_dir=None, + coverage_include=None, + coverage_output_file=None, + exclude_files=None, + exclude_tests=None, + include_files=None, + django=False, + ): + self.files_or_dirs = files_or_dirs + self.verbosity = verbosity + self.include_tests = include_tests + self.tests = tests + self.port = port + self.files_to_tests = files_to_tests + self.jobs = jobs + self.split_jobs = split_jobs + self.django = django + + if include_tests: + assert isinstance(include_tests, (list, tuple)) + + if exclude_files: + assert isinstance(exclude_files, (list, tuple)) + + if exclude_tests: + assert isinstance(exclude_tests, (list, tuple)) + + self.exclude_files = exclude_files + self.include_files = include_files + self.exclude_tests = exclude_tests + + self.coverage_output_dir = coverage_output_dir + self.coverage_include = coverage_include + self.coverage_output_file = coverage_output_file + + def __str__(self): + return '''Configuration + - files_or_dirs: %s + - verbosity: %s + - tests: %s + - port: %s + - files_to_tests: %s + - jobs: %s + - split_jobs: %s + + - include_files: %s + - include_tests: %s + + - exclude_files: %s + - exclude_tests: %s + + - coverage_output_dir: %s + - coverage_include_dir: %s + - coverage_output_file: %s + + - django: %s +''' % ( + self.files_or_dirs, + self.verbosity, + self.tests, + self.port, + self.files_to_tests, + self.jobs, + self.split_jobs, + + self.include_files, + self.include_tests, + + self.exclude_files, + self.exclude_tests, + + self.coverage_output_dir, + self.coverage_include, + self.coverage_output_file, + + self.django, + ) + + +#======================================================================================================================= +# parse_cmdline +#======================================================================================================================= +def parse_cmdline(argv=None): + """ + Parses command line and returns test directories, verbosity, test filter and test suites + + usage: + runfiles.py -v|--verbosity <level> -t|--tests <Test.test1,Test2> dirs|files + + Multiprocessing options: + jobs=number (with the number of jobs to be used to run the tests) + split_jobs='module'|'tests' + if == module, a given job will always receive all the tests from a module + if == tests, the tests will be split independently of their originating module (default) + + --exclude_files = comma-separated list of patterns with files to exclude (fnmatch style) + --include_files = comma-separated list of patterns with files to include (fnmatch style) + --exclude_tests = comma-separated list of patterns with test names to exclude (fnmatch style) + + Note: if --tests is given, --exclude_files, --include_files and --exclude_tests are ignored! + """ + if argv is None: + argv = sys.argv + + verbosity = 2 + include_tests = None + tests = None + port = None + jobs = 1 + split_jobs = 'tests' + files_to_tests = {} + coverage_output_dir = None + coverage_include = None + exclude_files = None + exclude_tests = None + include_files = None + django = False + + from _pydev_getopt import gnu_getopt + optlist, dirs = gnu_getopt( + argv[1:], "", + [ + "verbosity=", + "tests=", + + "port=", + "config_file=", + + "jobs=", + "split_jobs=", + + "include_tests=", + "include_files=", + + "exclude_files=", + "exclude_tests=", + + "coverage_output_dir=", + "coverage_include=", + + "django=" + ] + ) + + for opt, value in optlist: + if opt in ("-v", "--verbosity"): + verbosity = value + + elif opt in ("-p", "--port"): + port = int(value) + + elif opt in ("-j", "--jobs"): + jobs = int(value) + + elif opt in ("-s", "--split_jobs"): + split_jobs = value + if split_jobs not in ('module', 'tests'): + raise AssertionError('Expected split to be either "module" or "tests". Was :%s' % (split_jobs,)) + + elif opt in ("-d", "--coverage_output_dir",): + coverage_output_dir = value.strip() + + elif opt in ("-i", "--coverage_include",): + coverage_include = value.strip() + + elif opt in ("-I", "--include_tests"): + include_tests = value.split(',') + + elif opt in ("-E", "--exclude_files"): + exclude_files = value.split(',') + + elif opt in ("-F", "--include_files"): + include_files = value.split(',') + + elif opt in ("-e", "--exclude_tests"): + exclude_tests = value.split(',') + + elif opt in ("-t", "--tests"): + tests = value.split(',') + + elif opt in ("--django",): + django = value.strip() in ['true', 'True', '1'] + + elif opt in ("-c", "--config_file"): + config_file = value.strip() + if os.path.exists(config_file): + f = open(config_file, 'rU') + try: + config_file_contents = f.read() + finally: + f.close() + + if config_file_contents: + config_file_contents = config_file_contents.strip() + + if config_file_contents: + for line in config_file_contents.splitlines(): + file_and_test = line.split('|') + if len(file_and_test) == 2: + file, test = file_and_test + if DictContains(files_to_tests, file): + files_to_tests[file].append(test) + else: + files_to_tests[file] = [test] + + else: + sys.stderr.write('Could not find config file: %s\n' % (config_file,)) + + if type([]) != type(dirs): + dirs = [dirs] + + ret_dirs = [] + for d in dirs: + if '|' in d: + #paths may come from the ide separated by | + ret_dirs.extend(d.split('|')) + else: + ret_dirs.append(d) + + verbosity = int(verbosity) + + if tests: + if verbosity > 4: + sys.stdout.write('--tests provided. Ignoring --exclude_files, --exclude_tests and --include_files\n') + exclude_files = exclude_tests = include_files = None + + config = Configuration( + ret_dirs, + verbosity, + include_tests, + tests, + port, + files_to_tests, + jobs, + split_jobs, + coverage_output_dir, + coverage_include, + exclude_files=exclude_files, + exclude_tests=exclude_tests, + include_files=include_files, + django=django, + ) + + if verbosity > 5: + sys.stdout.write(str(config) + '\n') + return config + + +#======================================================================================================================= +# PydevTestRunner +#======================================================================================================================= +class PydevTestRunner(object): + """ finds and runs a file or directory of files as a unit test """ + + __py_extensions = ["*.py", "*.pyw"] + __exclude_files = ["__init__.*"] + + #Just to check that only this attributes will be written to this file + __slots__ = [ + 'verbosity', #Always used + + 'files_to_tests', #If this one is given, the ones below are not used + + 'files_or_dirs', #Files or directories received in the command line + 'include_tests', #The filter used to collect the tests + 'tests', #Strings with the tests to be run + + 'jobs', #Integer with the number of jobs that should be used to run the test cases + 'split_jobs', #String with 'tests' or 'module' (how should the jobs be split) + + 'configuration', + 'coverage', + ] + + def __init__(self, configuration): + self.verbosity = configuration.verbosity + + self.jobs = configuration.jobs + self.split_jobs = configuration.split_jobs + + files_to_tests = configuration.files_to_tests + if files_to_tests: + self.files_to_tests = files_to_tests + self.files_or_dirs = list(files_to_tests.keys()) + self.tests = None + else: + self.files_to_tests = {} + self.files_or_dirs = configuration.files_or_dirs + self.tests = configuration.tests + + self.configuration = configuration + self.__adjust_path() + + + def __adjust_path(self): + """ add the current file or directory to the python path """ + path_to_append = None + for n in xrange(len(self.files_or_dirs)): + dir_name = self.__unixify(self.files_or_dirs[n]) + if os.path.isdir(dir_name): + if not dir_name.endswith("/"): + self.files_or_dirs[n] = dir_name + "/" + path_to_append = os.path.normpath(dir_name) + elif os.path.isfile(dir_name): + path_to_append = os.path.dirname(dir_name) + else: + if not os.path.exists(dir_name): + block_line = '*' * 120 + sys.stderr.write('\n%s\n* PyDev test runner error: %s does not exist.\n%s\n' % (block_line, dir_name, block_line)) + return + msg = ("unknown type. \n%s\nshould be file or a directory.\n" % (dir_name)) + raise RuntimeError(msg) + if path_to_append is not None: + #Add it as the last one (so, first things are resolved against the default dirs and + #if none resolves, then we try a relative import). + sys.path.append(path_to_append) + + def __is_valid_py_file(self, fname): + """ tests that a particular file contains the proper file extension + and is not in the list of files to exclude """ + is_valid_fname = 0 + for invalid_fname in self.__class__.__exclude_files: + is_valid_fname += int(not fnmatch.fnmatch(fname, invalid_fname)) + if_valid_ext = 0 + for ext in self.__class__.__py_extensions: + if_valid_ext += int(fnmatch.fnmatch(fname, ext)) + return is_valid_fname > 0 and if_valid_ext > 0 + + def __unixify(self, s): + """ stupid windows. converts the backslash to forwardslash for consistency """ + return os.path.normpath(s).replace(os.sep, "/") + + def __importify(self, s, dir=False): + """ turns directory separators into dots and removes the ".py*" extension + so the string can be used as import statement """ + if not dir: + dirname, fname = os.path.split(s) + + if fname.count('.') > 1: + #if there's a file named xxx.xx.py, it is not a valid module, so, let's not load it... + return + + imp_stmt_pieces = [dirname.replace("\\", "/").replace("/", "."), os.path.splitext(fname)[0]] + + if len(imp_stmt_pieces[0]) == 0: + imp_stmt_pieces = imp_stmt_pieces[1:] + + return ".".join(imp_stmt_pieces) + + else: #handle dir + return s.replace("\\", "/").replace("/", ".") + + def __add_files(self, pyfiles, root, files): + """ if files match, appends them to pyfiles. used by os.path.walk fcn """ + for fname in files: + if self.__is_valid_py_file(fname): + name_without_base_dir = self.__unixify(os.path.join(root, fname)) + pyfiles.append(name_without_base_dir) + + + def find_import_files(self): + """ return a list of files to import """ + if self.files_to_tests: + pyfiles = self.files_to_tests.keys() + else: + pyfiles = [] + + for base_dir in self.files_or_dirs: + if os.path.isdir(base_dir): + if hasattr(os, 'walk'): + for root, dirs, files in os.walk(base_dir): + + #Note: handling directories that should be excluded from the search because + #they don't have __init__.py + exclude = {} + for d in dirs: + for init in ['__init__.py', '__init__.pyo', '__init__.pyc', '__init__.pyw']: + if os.path.exists(os.path.join(root, d, init).replace('\\', '/')): + break + else: + exclude[d] = 1 + + if exclude: + new = [] + for d in dirs: + if d not in exclude: + new.append(d) + + dirs[:] = new + + self.__add_files(pyfiles, root, files) + else: + # jython2.1 is too old for os.walk! + os.path.walk(base_dir, self.__add_files, pyfiles) + + elif os.path.isfile(base_dir): + pyfiles.append(base_dir) + + if self.configuration.exclude_files or self.configuration.include_files: + ret = [] + for f in pyfiles: + add = True + basename = os.path.basename(f) + if self.configuration.include_files: + add = False + + for pat in self.configuration.include_files: + if fnmatch.fnmatchcase(basename, pat): + add = True + break + + if not add: + if self.verbosity > 3: + sys.stdout.write('Skipped file: %s (did not match any include_files pattern: %s)\n' % (f, self.configuration.include_files)) + + elif self.configuration.exclude_files: + for pat in self.configuration.exclude_files: + if fnmatch.fnmatchcase(basename, pat): + if self.verbosity > 3: + sys.stdout.write('Skipped file: %s (matched exclude_files pattern: %s)\n' % (f, pat)) + + elif self.verbosity > 2: + sys.stdout.write('Skipped file: %s\n' % (f,)) + + add = False + break + + if add: + if self.verbosity > 3: + sys.stdout.write('Adding file: %s for test discovery.\n' % (f,)) + ret.append(f) + + pyfiles = ret + + + return pyfiles + + def __get_module_from_str(self, modname, print_exception, pyfile): + """ Import the module in the given import path. + * Returns the "final" module, so importing "coilib40.subject.visu" + returns the "visu" module, not the "coilib40" as returned by __import__ """ + try: + mod = __import__(modname) + for part in modname.split('.')[1:]: + mod = getattr(mod, part) + return mod + except: + if print_exception: + import pydev_runfiles_xml_rpc + import pydevd_io + buf_err = pydevd_io.StartRedirect(keep_original_redirection=True, std='stderr') + buf_out = pydevd_io.StartRedirect(keep_original_redirection=True, std='stdout') + try: + import traceback;traceback.print_exc() + sys.stderr.write('ERROR: Module: %s could not be imported (file: %s).\n' % (modname, pyfile)) + finally: + pydevd_io.EndRedirect('stderr') + pydevd_io.EndRedirect('stdout') + + pydev_runfiles_xml_rpc.notifyTest( + 'error', buf_out.getvalue(), buf_err.getvalue(), pyfile, modname, 0) + + return None + + def find_modules_from_files(self, pyfiles): + """ returns a list of modules given a list of files """ + #let's make sure that the paths we want are in the pythonpath... + imports = [(s, self.__importify(s)) for s in pyfiles] + + system_paths = [] + for s in sys.path: + system_paths.append(self.__importify(s, True)) + + + ret = [] + for pyfile, imp in imports: + if imp is None: + continue #can happen if a file is not a valid module + choices = [] + for s in system_paths: + if imp.startswith(s): + add = imp[len(s) + 1:] + if add: + choices.append(add) + #sys.stdout.write(' ' + add + ' ') + + if not choices: + sys.stdout.write('PYTHONPATH not found for file: %s\n' % imp) + else: + for i, import_str in enumerate(choices): + print_exception = i == len(choices) - 1 + mod = self.__get_module_from_str(import_str, print_exception, pyfile) + if mod is not None: + ret.append((pyfile, mod, import_str)) + break + + + return ret + + #=================================================================================================================== + # GetTestCaseNames + #=================================================================================================================== + class GetTestCaseNames: + """Yes, we need a class for that (cannot use outer context on jython 2.1)""" + + def __init__(self, accepted_classes, accepted_methods): + self.accepted_classes = accepted_classes + self.accepted_methods = accepted_methods + + def __call__(self, testCaseClass): + """Return a sorted sequence of method names found within testCaseClass""" + testFnNames = [] + className = testCaseClass.__name__ + + if DictContains(self.accepted_classes, className): + for attrname in dir(testCaseClass): + #If a class is chosen, we select all the 'test' methods' + if attrname.startswith('test') and hasattr(getattr(testCaseClass, attrname), '__call__'): + testFnNames.append(attrname) + + else: + for attrname in dir(testCaseClass): + #If we have the class+method name, we must do a full check and have an exact match. + if DictContains(self.accepted_methods, className + '.' + attrname): + if hasattr(getattr(testCaseClass, attrname), '__call__'): + testFnNames.append(attrname) + + #sorted() is not available in jython 2.1 + testFnNames.sort() + return testFnNames + + + def _decorate_test_suite(self, suite, pyfile, module_name): + if isinstance(suite, unittest.TestSuite): + add = False + suite.__pydev_pyfile__ = pyfile + suite.__pydev_module_name__ = module_name + + for t in suite._tests: + t.__pydev_pyfile__ = pyfile + t.__pydev_module_name__ = module_name + if self._decorate_test_suite(t, pyfile, module_name): + add = True + + return add + + elif isinstance(suite, unittest.TestCase): + return True + + else: + return False + + + + def find_tests_from_modules(self, file_and_modules_and_module_name): + """ returns the unittests given a list of modules """ + #Use our own suite! + unittest.TestLoader.suiteClass = pydev_runfiles_unittest.PydevTestSuite + loader = unittest.TestLoader() + + ret = [] + if self.files_to_tests: + for pyfile, m, module_name in file_and_modules_and_module_name: + accepted_classes = {} + accepted_methods = {} + tests = self.files_to_tests[pyfile] + for t in tests: + accepted_methods[t] = t + + loader.getTestCaseNames = self.GetTestCaseNames(accepted_classes, accepted_methods) + + suite = loader.loadTestsFromModule(m) + if self._decorate_test_suite(suite, pyfile, module_name): + ret.append(suite) + return ret + + + if self.tests: + accepted_classes = {} + accepted_methods = {} + + for t in self.tests: + splitted = t.split('.') + if len(splitted) == 1: + accepted_classes[t] = t + + elif len(splitted) == 2: + accepted_methods[t] = t + + loader.getTestCaseNames = self.GetTestCaseNames(accepted_classes, accepted_methods) + + + for pyfile, m, module_name in file_and_modules_and_module_name: + suite = loader.loadTestsFromModule(m) + if self._decorate_test_suite(suite, pyfile, module_name): + ret.append(suite) + + return ret + + + def filter_tests(self, test_objs, internal_call=False): + """ based on a filter name, only return those tests that have + the test case names that match """ + if not internal_call: + if not self.configuration.include_tests and not self.tests and not self.configuration.exclude_tests: + #No need to filter if we have nothing to filter! + return test_objs + + if self.verbosity > 1: + if self.configuration.include_tests: + sys.stdout.write('Tests to include: %s\n' % (self.configuration.include_tests,)) + + if self.tests: + sys.stdout.write('Tests to run: %s\n' % (self.tests,)) + + if self.configuration.exclude_tests: + sys.stdout.write('Tests to exclude: %s\n' % (self.configuration.exclude_tests,)) + + test_suite = [] + for test_obj in test_objs: + + if isinstance(test_obj, unittest.TestSuite): + #Note: keep the suites as they are and just 'fix' the tests (so, don't use the iter_tests). + if test_obj._tests: + test_obj._tests = self.filter_tests(test_obj._tests, True) + if test_obj._tests: #Only add the suite if we still have tests there. + test_suite.append(test_obj) + + elif isinstance(test_obj, unittest.TestCase): + try: + testMethodName = test_obj._TestCase__testMethodName + except AttributeError: + #changed in python 2.5 + testMethodName = test_obj._testMethodName + + add = True + if self.configuration.exclude_tests: + for pat in self.configuration.exclude_tests: + if fnmatch.fnmatchcase(testMethodName, pat): + if self.verbosity > 3: + sys.stdout.write('Skipped test: %s (matched exclude_tests pattern: %s)\n' % (testMethodName, pat)) + + elif self.verbosity > 2: + sys.stdout.write('Skipped test: %s\n' % (testMethodName,)) + + add = False + break + + if add: + if self.__match_tests(self.tests, test_obj, testMethodName): + include = True + if self.configuration.include_tests: + include = False + for pat in self.configuration.include_tests: + if fnmatch.fnmatchcase(testMethodName, pat): + include = True + break + if include: + test_suite.append(test_obj) + else: + if self.verbosity > 3: + sys.stdout.write('Skipped test: %s (did not match any include_tests pattern %s)\n' % (self.configuration.include_tests,)) + return test_suite + + + def iter_tests(self, test_objs): + #Note: not using yield because of Jython 2.1. + tests = [] + for test_obj in test_objs: + if isinstance(test_obj, unittest.TestSuite): + tests.extend(self.iter_tests(test_obj._tests)) + + elif isinstance(test_obj, unittest.TestCase): + tests.append(test_obj) + return tests + + + def list_test_names(self, test_objs): + names = [] + for tc in self.iter_tests(test_objs): + try: + testMethodName = tc._TestCase__testMethodName + except AttributeError: + #changed in python 2.5 + testMethodName = tc._testMethodName + names.append(testMethodName) + return names + + + def __match_tests(self, tests, test_case, test_method_name): + if not tests: + return 1 + + for t in tests: + class_and_method = t.split('.') + if len(class_and_method) == 1: + #only class name + if class_and_method[0] == test_case.__class__.__name__: + return 1 + + elif len(class_and_method) == 2: + if class_and_method[0] == test_case.__class__.__name__ and class_and_method[1] == test_method_name: + return 1 + + return 0 + + + def __match(self, filter_list, name): + """ returns whether a test name matches the test filter """ + if filter_list is None: + return 1 + for f in filter_list: + if re.match(f, name): + return 1 + return 0 + + + def run_tests(self, handle_coverage=True): + """ runs all tests """ + sys.stdout.write("Finding files... ") + files = self.find_import_files() + if self.verbosity > 3: + sys.stdout.write('%s ... done.\n' % (self.files_or_dirs)) + else: + sys.stdout.write('done.\n') + sys.stdout.write("Importing test modules ... ") + + + if handle_coverage: + coverage_files, coverage = StartCoverageSupport(self.configuration) + + file_and_modules_and_module_name = self.find_modules_from_files(files) + sys.stdout.write("done.\n") + + all_tests = self.find_tests_from_modules(file_and_modules_and_module_name) + all_tests = self.filter_tests(all_tests) + + test_suite = pydev_runfiles_unittest.PydevTestSuite(all_tests) + import pydev_runfiles_xml_rpc + pydev_runfiles_xml_rpc.notifyTestsCollected(test_suite.countTestCases()) + + start_time = time.time() + + def run_tests(): + executed_in_parallel = False + if self.jobs > 1: + import pydev_runfiles_parallel + + #What may happen is that the number of jobs needed is lower than the number of jobs requested + #(e.g.: 2 jobs were requested for running 1 test) -- in which case ExecuteTestsInParallel will + #return False and won't run any tests. + executed_in_parallel = pydev_runfiles_parallel.ExecuteTestsInParallel( + all_tests, self.jobs, self.split_jobs, self.verbosity, coverage_files, self.configuration.coverage_include) + + if not executed_in_parallel: + #If in coverage, we don't need to pass anything here (coverage is already enabled for this execution). + runner = pydev_runfiles_unittest.PydevTextTestRunner(stream=sys.stdout, descriptions=1, verbosity=self.verbosity) + sys.stdout.write('\n') + runner.run(test_suite) + + if self.configuration.django: + MyDjangoTestSuiteRunner(run_tests).run_tests([]) + else: + run_tests() + + if handle_coverage: + coverage.stop() + coverage.save() + + total_time = 'Finished in: %.2f secs.' % (time.time() - start_time,) + pydev_runfiles_xml_rpc.notifyTestRunFinished(total_time) + + +try: + from django.test.simple import DjangoTestSuiteRunner +except: + class DjangoTestSuiteRunner: + def __init__(self): + pass + + def run_tests(self, *args, **kwargs): + raise AssertionError("Unable to run suite with DjangoTestSuiteRunner because it couldn't be imported.") + +class MyDjangoTestSuiteRunner(DjangoTestSuiteRunner): + + def __init__(self, on_run_suite): + DjangoTestSuiteRunner.__init__(self) + self.on_run_suite = on_run_suite + + def build_suite(self, *args, **kwargs): + pass + + def suite_result(self, *args, **kwargs): + pass + + def run_suite(self, *args, **kwargs): + self.on_run_suite() + + +#======================================================================================================================= +# main +#======================================================================================================================= +def main(configuration): + PydevTestRunner(configuration).run_tests() |