summaryrefslogtreecommitdiff
path: root/lib/python2.7/ctypes/test/__init__.py
blob: 808e4185ef0a25f53d9eb348d4da5a079c6e8d3b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import os, sys, unittest, getopt, time

use_resources = []

class ResourceDenied(Exception):
    """Test skipped because it requested a disallowed resource.

    This is raised when a test calls requires() for a resource that
    has not be enabled.  Resources are defined by test modules.
    """

def is_resource_enabled(resource):
    """Test whether a resource is enabled.

    If the caller's module is __main__ then automatically return True."""
    if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
        return True
    result = use_resources is not None and \
           (resource in use_resources or "*" in use_resources)
    if not result:
        _unavail[resource] = None
    return result

_unavail = {}
def requires(resource, msg=None):
    """Raise ResourceDenied if the specified resource is not available.

    If the caller's module is __main__ then automatically return True."""
    # see if the caller's module is __main__ - if so, treat as if
    # the resource was set
    if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
        return
    if not is_resource_enabled(resource):
        if msg is None:
            msg = "Use of the `%s' resource not enabled" % resource
        raise ResourceDenied(msg)

def find_package_modules(package, mask):
    import fnmatch
    if (hasattr(package, "__loader__") and
            hasattr(package.__loader__, '_files')):
        path = package.__name__.replace(".", os.path.sep)
        mask = os.path.join(path, mask)
        for fnm in package.__loader__._files.iterkeys():
            if fnmatch.fnmatchcase(fnm, mask):
                yield os.path.splitext(fnm)[0].replace(os.path.sep, ".")
    else:
        path = package.__path__[0]
        for fnm in os.listdir(path):
            if fnmatch.fnmatchcase(fnm, mask):
                yield "%s.%s" % (package.__name__, os.path.splitext(fnm)[0])

def get_tests(package, mask, verbosity, exclude=()):
    """Return a list of skipped test modules, and a list of test cases."""
    tests = []
    skipped = []
    for modname in find_package_modules(package, mask):
        if modname.split(".")[-1] in exclude:
            skipped.append(modname)
            if verbosity > 1:
                print >> sys.stderr, "Skipped %s: excluded" % modname
            continue
        try:
            mod = __import__(modname, globals(), locals(), ['*'])
        except (ResourceDenied, unittest.SkipTest) as detail:
            skipped.append(modname)
            if verbosity > 1:
                print >> sys.stderr, "Skipped %s: %s" % (modname, detail)
            continue
        for name in dir(mod):
            if name.startswith("_"):
                continue
            o = getattr(mod, name)
            if type(o) is type(unittest.TestCase) and issubclass(o, unittest.TestCase):
                tests.append(o)
    return skipped, tests

def usage():
    print __doc__
    return 1

def test_with_refcounts(runner, verbosity, testcase):
    """Run testcase several times, tracking reference counts."""
    import gc
    import ctypes
    ptc = ctypes._pointer_type_cache.copy()
    cfc = ctypes._c_functype_cache.copy()
    wfc = ctypes._win_functype_cache.copy()

    # when searching for refcount leaks, we have to manually reset any
    # caches that ctypes has.
    def cleanup():
        ctypes._pointer_type_cache = ptc.copy()
        ctypes._c_functype_cache = cfc.copy()
        ctypes._win_functype_cache = wfc.copy()
        gc.collect()

    test = unittest.makeSuite(testcase)
    for i in range(5):
        rc = sys.gettotalrefcount()
        runner.run(test)
        cleanup()
    COUNT = 5
    refcounts = [None] * COUNT
    for i in range(COUNT):
        rc = sys.gettotalrefcount()
        runner.run(test)
        cleanup()
        refcounts[i] = sys.gettotalrefcount() - rc
    if filter(None, refcounts):
        print "%s leaks:\n\t" % testcase, refcounts
    elif verbosity:
        print "%s: ok." % testcase

class TestRunner(unittest.TextTestRunner):
    def run(self, test, skipped):
        "Run the given test case or test suite."
        # Same as unittest.TextTestRunner.run, except that it reports
        # skipped tests.
        result = self._makeResult()
        startTime = time.time()
        test(result)
        stopTime = time.time()
        timeTaken = stopTime - startTime
        result.printErrors()
        self.stream.writeln(result.separator2)
        run = result.testsRun
        if _unavail: #skipped:
            requested = _unavail.keys()
            requested.sort()
            self.stream.writeln("Ran %d test%s in %.3fs (%s module%s skipped)" %
                                (run, run != 1 and "s" or "", timeTaken,
                                 len(skipped),
                                 len(skipped) != 1 and "s" or ""))
            self.stream.writeln("Unavailable resources: %s" % ", ".join(requested))
        else:
            self.stream.writeln("Ran %d test%s in %.3fs" %
                                (run, run != 1 and "s" or "", timeTaken))
        self.stream.writeln()
        if not result.wasSuccessful():
            self.stream.write("FAILED (")
            failed, errored = map(len, (result.failures, result.errors))
            if failed:
                self.stream.write("failures=%d" % failed)
            if errored:
                if failed: self.stream.write(", ")
                self.stream.write("errors=%d" % errored)
            self.stream.writeln(")")
        else:
            self.stream.writeln("OK")
        return result


def main(*packages):
    try:
        opts, args = getopt.getopt(sys.argv[1:], "rqvu:x:")
    except getopt.error:
        return usage()

    verbosity = 1
    search_leaks = False
    exclude = []
    for flag, value in opts:
        if flag == "-q":
            verbosity -= 1
        elif flag == "-v":
            verbosity += 1
        elif flag == "-r":
            try:
                sys.gettotalrefcount
            except AttributeError:
                print >> sys.stderr, "-r flag requires Python debug build"
                return -1
            search_leaks = True
        elif flag == "-u":
            use_resources.extend(value.split(","))
        elif flag == "-x":
            exclude.extend(value.split(","))

    mask = "test_*.py"
    if args:
        mask = args[0]

    for package in packages:
        run_tests(package, mask, verbosity, search_leaks, exclude)


def run_tests(package, mask, verbosity, search_leaks, exclude):
    skipped, testcases = get_tests(package, mask, verbosity, exclude)
    runner = TestRunner(verbosity=verbosity)

    suites = [unittest.makeSuite(o) for o in testcases]
    suite = unittest.TestSuite(suites)
    result = runner.run(suite, skipped)

    if search_leaks:
        # hunt for refcount leaks
        runner = BasicTestRunner()
        for t in testcases:
            test_with_refcounts(runner, verbosity, t)

    return bool(result.errors)

class BasicTestRunner:
    def run(self, test):
        result = unittest.TestResult()
        test(result)
        return result