summaryrefslogtreecommitdiff
path: root/lib/python2.7/ctypes/test/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/ctypes/test/__init__.py')
-rw-r--r--lib/python2.7/ctypes/test/__init__.py208
1 files changed, 208 insertions, 0 deletions
diff --git a/lib/python2.7/ctypes/test/__init__.py b/lib/python2.7/ctypes/test/__init__.py
new file mode 100644
index 0000000..808e418
--- /dev/null
+++ b/lib/python2.7/ctypes/test/__init__.py
@@ -0,0 +1,208 @@
+import os, sys, unittest, getopt, time
+
+use_resources = []
+
+class ResourceDenied(Exception):
+ """Test skipped because it requested a disallowed resource.
+
+ This is raised when a test calls requires() for a resource that
+ has not be enabled. Resources are defined by test modules.
+ """
+
+def is_resource_enabled(resource):
+ """Test whether a resource is enabled.
+
+ If the caller's module is __main__ then automatically return True."""
+ if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
+ return True
+ result = use_resources is not None and \
+ (resource in use_resources or "*" in use_resources)
+ if not result:
+ _unavail[resource] = None
+ return result
+
+_unavail = {}
+def requires(resource, msg=None):
+ """Raise ResourceDenied if the specified resource is not available.
+
+ If the caller's module is __main__ then automatically return True."""
+ # see if the caller's module is __main__ - if so, treat as if
+ # the resource was set
+ if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
+ return
+ if not is_resource_enabled(resource):
+ if msg is None:
+ msg = "Use of the `%s' resource not enabled" % resource
+ raise ResourceDenied(msg)
+
+def find_package_modules(package, mask):
+ import fnmatch
+ if (hasattr(package, "__loader__") and
+ hasattr(package.__loader__, '_files')):
+ path = package.__name__.replace(".", os.path.sep)
+ mask = os.path.join(path, mask)
+ for fnm in package.__loader__._files.iterkeys():
+ if fnmatch.fnmatchcase(fnm, mask):
+ yield os.path.splitext(fnm)[0].replace(os.path.sep, ".")
+ else:
+ path = package.__path__[0]
+ for fnm in os.listdir(path):
+ if fnmatch.fnmatchcase(fnm, mask):
+ yield "%s.%s" % (package.__name__, os.path.splitext(fnm)[0])
+
+def get_tests(package, mask, verbosity, exclude=()):
+ """Return a list of skipped test modules, and a list of test cases."""
+ tests = []
+ skipped = []
+ for modname in find_package_modules(package, mask):
+ if modname.split(".")[-1] in exclude:
+ skipped.append(modname)
+ if verbosity > 1:
+ print >> sys.stderr, "Skipped %s: excluded" % modname
+ continue
+ try:
+ mod = __import__(modname, globals(), locals(), ['*'])
+ except (ResourceDenied, unittest.SkipTest) as detail:
+ skipped.append(modname)
+ if verbosity > 1:
+ print >> sys.stderr, "Skipped %s: %s" % (modname, detail)
+ continue
+ for name in dir(mod):
+ if name.startswith("_"):
+ continue
+ o = getattr(mod, name)
+ if type(o) is type(unittest.TestCase) and issubclass(o, unittest.TestCase):
+ tests.append(o)
+ return skipped, tests
+
+def usage():
+ print __doc__
+ return 1
+
+def test_with_refcounts(runner, verbosity, testcase):
+ """Run testcase several times, tracking reference counts."""
+ import gc
+ import ctypes
+ ptc = ctypes._pointer_type_cache.copy()
+ cfc = ctypes._c_functype_cache.copy()
+ wfc = ctypes._win_functype_cache.copy()
+
+ # when searching for refcount leaks, we have to manually reset any
+ # caches that ctypes has.
+ def cleanup():
+ ctypes._pointer_type_cache = ptc.copy()
+ ctypes._c_functype_cache = cfc.copy()
+ ctypes._win_functype_cache = wfc.copy()
+ gc.collect()
+
+ test = unittest.makeSuite(testcase)
+ for i in range(5):
+ rc = sys.gettotalrefcount()
+ runner.run(test)
+ cleanup()
+ COUNT = 5
+ refcounts = [None] * COUNT
+ for i in range(COUNT):
+ rc = sys.gettotalrefcount()
+ runner.run(test)
+ cleanup()
+ refcounts[i] = sys.gettotalrefcount() - rc
+ if filter(None, refcounts):
+ print "%s leaks:\n\t" % testcase, refcounts
+ elif verbosity:
+ print "%s: ok." % testcase
+
+class TestRunner(unittest.TextTestRunner):
+ def run(self, test, skipped):
+ "Run the given test case or test suite."
+ # Same as unittest.TextTestRunner.run, except that it reports
+ # skipped tests.
+ result = self._makeResult()
+ startTime = time.time()
+ test(result)
+ stopTime = time.time()
+ timeTaken = stopTime - startTime
+ result.printErrors()
+ self.stream.writeln(result.separator2)
+ run = result.testsRun
+ if _unavail: #skipped:
+ requested = _unavail.keys()
+ requested.sort()
+ self.stream.writeln("Ran %d test%s in %.3fs (%s module%s skipped)" %
+ (run, run != 1 and "s" or "", timeTaken,
+ len(skipped),
+ len(skipped) != 1 and "s" or ""))
+ self.stream.writeln("Unavailable resources: %s" % ", ".join(requested))
+ else:
+ self.stream.writeln("Ran %d test%s in %.3fs" %
+ (run, run != 1 and "s" or "", timeTaken))
+ self.stream.writeln()
+ if not result.wasSuccessful():
+ self.stream.write("FAILED (")
+ failed, errored = map(len, (result.failures, result.errors))
+ if failed:
+ self.stream.write("failures=%d" % failed)
+ if errored:
+ if failed: self.stream.write(", ")
+ self.stream.write("errors=%d" % errored)
+ self.stream.writeln(")")
+ else:
+ self.stream.writeln("OK")
+ return result
+
+
+def main(*packages):
+ try:
+ opts, args = getopt.getopt(sys.argv[1:], "rqvu:x:")
+ except getopt.error:
+ return usage()
+
+ verbosity = 1
+ search_leaks = False
+ exclude = []
+ for flag, value in opts:
+ if flag == "-q":
+ verbosity -= 1
+ elif flag == "-v":
+ verbosity += 1
+ elif flag == "-r":
+ try:
+ sys.gettotalrefcount
+ except AttributeError:
+ print >> sys.stderr, "-r flag requires Python debug build"
+ return -1
+ search_leaks = True
+ elif flag == "-u":
+ use_resources.extend(value.split(","))
+ elif flag == "-x":
+ exclude.extend(value.split(","))
+
+ mask = "test_*.py"
+ if args:
+ mask = args[0]
+
+ for package in packages:
+ run_tests(package, mask, verbosity, search_leaks, exclude)
+
+
+def run_tests(package, mask, verbosity, search_leaks, exclude):
+ skipped, testcases = get_tests(package, mask, verbosity, exclude)
+ runner = TestRunner(verbosity=verbosity)
+
+ suites = [unittest.makeSuite(o) for o in testcases]
+ suite = unittest.TestSuite(suites)
+ result = runner.run(suite, skipped)
+
+ if search_leaks:
+ # hunt for refcount leaks
+ runner = BasicTestRunner()
+ for t in testcases:
+ test_with_refcounts(runner, verbosity, t)
+
+ return bool(result.errors)
+
+class BasicTestRunner:
+ def run(self, test):
+ result = unittest.TestResult()
+ test(result)
+ return result