diff options
Diffstat (limited to 'lib/python2.7/ctypes/test/__init__.py')
-rw-r--r-- | lib/python2.7/ctypes/test/__init__.py | 208 |
1 files changed, 208 insertions, 0 deletions
diff --git a/lib/python2.7/ctypes/test/__init__.py b/lib/python2.7/ctypes/test/__init__.py new file mode 100644 index 0000000..808e418 --- /dev/null +++ b/lib/python2.7/ctypes/test/__init__.py @@ -0,0 +1,208 @@ +import os, sys, unittest, getopt, time + +use_resources = [] + +class ResourceDenied(Exception): + """Test skipped because it requested a disallowed resource. + + This is raised when a test calls requires() for a resource that + has not be enabled. Resources are defined by test modules. + """ + +def is_resource_enabled(resource): + """Test whether a resource is enabled. + + If the caller's module is __main__ then automatically return True.""" + if sys._getframe().f_back.f_globals.get("__name__") == "__main__": + return True + result = use_resources is not None and \ + (resource in use_resources or "*" in use_resources) + if not result: + _unavail[resource] = None + return result + +_unavail = {} +def requires(resource, msg=None): + """Raise ResourceDenied if the specified resource is not available. + + If the caller's module is __main__ then automatically return True.""" + # see if the caller's module is __main__ - if so, treat as if + # the resource was set + if sys._getframe().f_back.f_globals.get("__name__") == "__main__": + return + if not is_resource_enabled(resource): + if msg is None: + msg = "Use of the `%s' resource not enabled" % resource + raise ResourceDenied(msg) + +def find_package_modules(package, mask): + import fnmatch + if (hasattr(package, "__loader__") and + hasattr(package.__loader__, '_files')): + path = package.__name__.replace(".", os.path.sep) + mask = os.path.join(path, mask) + for fnm in package.__loader__._files.iterkeys(): + if fnmatch.fnmatchcase(fnm, mask): + yield os.path.splitext(fnm)[0].replace(os.path.sep, ".") + else: + path = package.__path__[0] + for fnm in os.listdir(path): + if fnmatch.fnmatchcase(fnm, mask): + yield "%s.%s" % (package.__name__, os.path.splitext(fnm)[0]) + +def get_tests(package, mask, verbosity, exclude=()): + """Return a list of skipped test modules, and a list of test cases.""" + tests = [] + skipped = [] + for modname in find_package_modules(package, mask): + if modname.split(".")[-1] in exclude: + skipped.append(modname) + if verbosity > 1: + print >> sys.stderr, "Skipped %s: excluded" % modname + continue + try: + mod = __import__(modname, globals(), locals(), ['*']) + except (ResourceDenied, unittest.SkipTest) as detail: + skipped.append(modname) + if verbosity > 1: + print >> sys.stderr, "Skipped %s: %s" % (modname, detail) + continue + for name in dir(mod): + if name.startswith("_"): + continue + o = getattr(mod, name) + if type(o) is type(unittest.TestCase) and issubclass(o, unittest.TestCase): + tests.append(o) + return skipped, tests + +def usage(): + print __doc__ + return 1 + +def test_with_refcounts(runner, verbosity, testcase): + """Run testcase several times, tracking reference counts.""" + import gc + import ctypes + ptc = ctypes._pointer_type_cache.copy() + cfc = ctypes._c_functype_cache.copy() + wfc = ctypes._win_functype_cache.copy() + + # when searching for refcount leaks, we have to manually reset any + # caches that ctypes has. + def cleanup(): + ctypes._pointer_type_cache = ptc.copy() + ctypes._c_functype_cache = cfc.copy() + ctypes._win_functype_cache = wfc.copy() + gc.collect() + + test = unittest.makeSuite(testcase) + for i in range(5): + rc = sys.gettotalrefcount() + runner.run(test) + cleanup() + COUNT = 5 + refcounts = [None] * COUNT + for i in range(COUNT): + rc = sys.gettotalrefcount() + runner.run(test) + cleanup() + refcounts[i] = sys.gettotalrefcount() - rc + if filter(None, refcounts): + print "%s leaks:\n\t" % testcase, refcounts + elif verbosity: + print "%s: ok." % testcase + +class TestRunner(unittest.TextTestRunner): + def run(self, test, skipped): + "Run the given test case or test suite." + # Same as unittest.TextTestRunner.run, except that it reports + # skipped tests. + result = self._makeResult() + startTime = time.time() + test(result) + stopTime = time.time() + timeTaken = stopTime - startTime + result.printErrors() + self.stream.writeln(result.separator2) + run = result.testsRun + if _unavail: #skipped: + requested = _unavail.keys() + requested.sort() + self.stream.writeln("Ran %d test%s in %.3fs (%s module%s skipped)" % + (run, run != 1 and "s" or "", timeTaken, + len(skipped), + len(skipped) != 1 and "s" or "")) + self.stream.writeln("Unavailable resources: %s" % ", ".join(requested)) + else: + self.stream.writeln("Ran %d test%s in %.3fs" % + (run, run != 1 and "s" or "", timeTaken)) + self.stream.writeln() + if not result.wasSuccessful(): + self.stream.write("FAILED (") + failed, errored = map(len, (result.failures, result.errors)) + if failed: + self.stream.write("failures=%d" % failed) + if errored: + if failed: self.stream.write(", ") + self.stream.write("errors=%d" % errored) + self.stream.writeln(")") + else: + self.stream.writeln("OK") + return result + + +def main(*packages): + try: + opts, args = getopt.getopt(sys.argv[1:], "rqvu:x:") + except getopt.error: + return usage() + + verbosity = 1 + search_leaks = False + exclude = [] + for flag, value in opts: + if flag == "-q": + verbosity -= 1 + elif flag == "-v": + verbosity += 1 + elif flag == "-r": + try: + sys.gettotalrefcount + except AttributeError: + print >> sys.stderr, "-r flag requires Python debug build" + return -1 + search_leaks = True + elif flag == "-u": + use_resources.extend(value.split(",")) + elif flag == "-x": + exclude.extend(value.split(",")) + + mask = "test_*.py" + if args: + mask = args[0] + + for package in packages: + run_tests(package, mask, verbosity, search_leaks, exclude) + + +def run_tests(package, mask, verbosity, search_leaks, exclude): + skipped, testcases = get_tests(package, mask, verbosity, exclude) + runner = TestRunner(verbosity=verbosity) + + suites = [unittest.makeSuite(o) for o in testcases] + suite = unittest.TestSuite(suites) + result = runner.run(suite, skipped) + + if search_leaks: + # hunt for refcount leaks + runner = BasicTestRunner() + for t in testcases: + test_with_refcounts(runner, verbosity, t) + + return bool(result.errors) + +class BasicTestRunner: + def run(self, test): + result = unittest.TestResult() + test(result) + return result |