1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
|
import os, sys, unittest, getopt, time
use_resources = []
class ResourceDenied(Exception):
"""Test skipped because it requested a disallowed resource.
This is raised when a test calls requires() for a resource that
has not be enabled. Resources are defined by test modules.
"""
def is_resource_enabled(resource):
"""Test whether a resource is enabled.
If the caller's module is __main__ then automatically return True."""
if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
return True
result = use_resources is not None and \
(resource in use_resources or "*" in use_resources)
if not result:
_unavail[resource] = None
return result
_unavail = {}
def requires(resource, msg=None):
"""Raise ResourceDenied if the specified resource is not available.
If the caller's module is __main__ then automatically return True."""
# see if the caller's module is __main__ - if so, treat as if
# the resource was set
if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
return
if not is_resource_enabled(resource):
if msg is None:
msg = "Use of the `%s' resource not enabled" % resource
raise ResourceDenied(msg)
def find_package_modules(package, mask):
import fnmatch
if (hasattr(package, "__loader__") and
hasattr(package.__loader__, '_files')):
path = package.__name__.replace(".", os.path.sep)
mask = os.path.join(path, mask)
for fnm in package.__loader__._files.iterkeys():
if fnmatch.fnmatchcase(fnm, mask):
yield os.path.splitext(fnm)[0].replace(os.path.sep, ".")
else:
path = package.__path__[0]
for fnm in os.listdir(path):
if fnmatch.fnmatchcase(fnm, mask):
yield "%s.%s" % (package.__name__, os.path.splitext(fnm)[0])
def get_tests(package, mask, verbosity, exclude=()):
"""Return a list of skipped test modules, and a list of test cases."""
tests = []
skipped = []
for modname in find_package_modules(package, mask):
if modname.split(".")[-1] in exclude:
skipped.append(modname)
if verbosity > 1:
print >> sys.stderr, "Skipped %s: excluded" % modname
continue
try:
mod = __import__(modname, globals(), locals(), ['*'])
except (ResourceDenied, unittest.SkipTest) as detail:
skipped.append(modname)
if verbosity > 1:
print >> sys.stderr, "Skipped %s: %s" % (modname, detail)
continue
for name in dir(mod):
if name.startswith("_"):
continue
o = getattr(mod, name)
if type(o) is type(unittest.TestCase) and issubclass(o, unittest.TestCase):
tests.append(o)
return skipped, tests
def usage():
print __doc__
return 1
def test_with_refcounts(runner, verbosity, testcase):
"""Run testcase several times, tracking reference counts."""
import gc
import ctypes
ptc = ctypes._pointer_type_cache.copy()
cfc = ctypes._c_functype_cache.copy()
wfc = ctypes._win_functype_cache.copy()
# when searching for refcount leaks, we have to manually reset any
# caches that ctypes has.
def cleanup():
ctypes._pointer_type_cache = ptc.copy()
ctypes._c_functype_cache = cfc.copy()
ctypes._win_functype_cache = wfc.copy()
gc.collect()
test = unittest.makeSuite(testcase)
for i in range(5):
rc = sys.gettotalrefcount()
runner.run(test)
cleanup()
COUNT = 5
refcounts = [None] * COUNT
for i in range(COUNT):
rc = sys.gettotalrefcount()
runner.run(test)
cleanup()
refcounts[i] = sys.gettotalrefcount() - rc
if filter(None, refcounts):
print "%s leaks:\n\t" % testcase, refcounts
elif verbosity:
print "%s: ok." % testcase
class TestRunner(unittest.TextTestRunner):
def run(self, test, skipped):
"Run the given test case or test suite."
# Same as unittest.TextTestRunner.run, except that it reports
# skipped tests.
result = self._makeResult()
startTime = time.time()
test(result)
stopTime = time.time()
timeTaken = stopTime - startTime
result.printErrors()
self.stream.writeln(result.separator2)
run = result.testsRun
if _unavail: #skipped:
requested = _unavail.keys()
requested.sort()
self.stream.writeln("Ran %d test%s in %.3fs (%s module%s skipped)" %
(run, run != 1 and "s" or "", timeTaken,
len(skipped),
len(skipped) != 1 and "s" or ""))
self.stream.writeln("Unavailable resources: %s" % ", ".join(requested))
else:
self.stream.writeln("Ran %d test%s in %.3fs" %
(run, run != 1 and "s" or "", timeTaken))
self.stream.writeln()
if not result.wasSuccessful():
self.stream.write("FAILED (")
failed, errored = map(len, (result.failures, result.errors))
if failed:
self.stream.write("failures=%d" % failed)
if errored:
if failed: self.stream.write(", ")
self.stream.write("errors=%d" % errored)
self.stream.writeln(")")
else:
self.stream.writeln("OK")
return result
def main(*packages):
try:
opts, args = getopt.getopt(sys.argv[1:], "rqvu:x:")
except getopt.error:
return usage()
verbosity = 1
search_leaks = False
exclude = []
for flag, value in opts:
if flag == "-q":
verbosity -= 1
elif flag == "-v":
verbosity += 1
elif flag == "-r":
try:
sys.gettotalrefcount
except AttributeError:
print >> sys.stderr, "-r flag requires Python debug build"
return -1
search_leaks = True
elif flag == "-u":
use_resources.extend(value.split(","))
elif flag == "-x":
exclude.extend(value.split(","))
mask = "test_*.py"
if args:
mask = args[0]
for package in packages:
run_tests(package, mask, verbosity, search_leaks, exclude)
def run_tests(package, mask, verbosity, search_leaks, exclude):
skipped, testcases = get_tests(package, mask, verbosity, exclude)
runner = TestRunner(verbosity=verbosity)
suites = [unittest.makeSuite(o) for o in testcases]
suite = unittest.TestSuite(suites)
result = runner.run(suite, skipped)
if search_leaks:
# hunt for refcount leaks
runner = BasicTestRunner()
for t in testcases:
test_with_refcounts(runner, verbosity, t)
return bool(result.errors)
class BasicTestRunner:
def run(self, test):
result = unittest.TestResult()
test(result)
return result
|