summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2023-04-12 02:06:07 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2023-04-12 02:06:07 +0000
commitbc041da43b2e8f78f6b70f4389bd07b48b140840 (patch)
tree2654a25dba2e55d6070ec1a77f308a60805d157f
parentcf7bd137a488a6c8489422e3dbcba3c6abd06ae0 (diff)
parent6756009db06964885a1cf2e7f72d39d12cedf021 (diff)
downloadlinux-x86-studio-main.tar.gz
Snap for 9922117 from 6756009db06964885a1cf2e7f72d39d12cedf021 to studio-giraffe-releasestudio-2022.3.1-rc1studio-2022.3.1-beta2studio-2022.3.1studio-main
Change-Id: I533e94f9dd0377cc83f957d78b31f2180f626b83
-rw-r--r--lib/python3.10/unittest/__init__.py95
-rw-r--r--lib/python3.10/unittest/__main__.py18
-rw-r--r--lib/python3.10/unittest/_log.py86
-rw-r--r--lib/python3.10/unittest/async_case.py168
-rw-r--r--lib/python3.10/unittest/case.py1452
-rw-r--r--lib/python3.10/unittest/loader.py517
-rw-r--r--lib/python3.10/unittest/main.py275
-rw-r--r--lib/python3.10/unittest/mock.py2950
-rw-r--r--lib/python3.10/unittest/result.py242
-rw-r--r--lib/python3.10/unittest/runner.py230
-rw-r--r--lib/python3.10/unittest/signals.py71
-rw-r--r--lib/python3.10/unittest/suite.py379
-rw-r--r--lib/python3.10/unittest/util.py170
13 files changed, 6653 insertions, 0 deletions
diff --git a/lib/python3.10/unittest/__init__.py b/lib/python3.10/unittest/__init__.py
new file mode 100644
index 0000000..348dc47
--- /dev/null
+++ b/lib/python3.10/unittest/__init__.py
@@ -0,0 +1,95 @@
+"""
+Python unit testing framework, based on Erich Gamma's JUnit and Kent Beck's
+Smalltalk testing framework (used with permission).
+
+This module contains the core framework classes that form the basis of
+specific test cases and suites (TestCase, TestSuite etc.), and also a
+text-based utility class for running the tests and reporting the results
+ (TextTestRunner).
+
+Simple usage:
+
+ import unittest
+
+ class IntegerArithmeticTestCase(unittest.TestCase):
+ def testAdd(self): # test method names begin with 'test'
+ self.assertEqual((1 + 2), 3)
+ self.assertEqual(0 + 1, 1)
+ def testMultiply(self):
+ self.assertEqual((0 * 10), 0)
+ self.assertEqual((5 * 8), 40)
+
+ if __name__ == '__main__':
+ unittest.main()
+
+Further information is available in the bundled documentation, and from
+
+ http://docs.python.org/library/unittest.html
+
+Copyright (c) 1999-2003 Steve Purcell
+Copyright (c) 2003-2010 Python Software Foundation
+This module is free software, and you may redistribute it and/or modify
+it under the same terms as Python itself, so long as this copyright message
+and disclaimer are retained in their original form.
+
+IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
+SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
+THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
+DAMAGE.
+
+THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
+AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
+SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+"""
+
+__all__ = ['TestResult', 'TestCase', 'IsolatedAsyncioTestCase', 'TestSuite',
+ 'TextTestRunner', 'TestLoader', 'FunctionTestCase', 'main',
+ 'defaultTestLoader', 'SkipTest', 'skip', 'skipIf', 'skipUnless',
+ 'expectedFailure', 'TextTestResult', 'installHandler',
+ 'registerResult', 'removeResult', 'removeHandler',
+ 'addModuleCleanup']
+
+# Expose obsolete functions for backwards compatibility
+__all__.extend(['getTestCaseNames', 'makeSuite', 'findTestCases'])
+
+__unittest = True
+
+from .result import TestResult
+from .case import (addModuleCleanup, TestCase, FunctionTestCase, SkipTest, skip,
+ skipIf, skipUnless, expectedFailure)
+from .suite import BaseTestSuite, TestSuite
+from .loader import (TestLoader, defaultTestLoader, makeSuite, getTestCaseNames,
+ findTestCases)
+from .main import TestProgram, main
+from .runner import TextTestRunner, TextTestResult
+from .signals import installHandler, registerResult, removeResult, removeHandler
+# IsolatedAsyncioTestCase will be imported lazily.
+
+# deprecated
+_TextTestResult = TextTestResult
+
+# There are no tests here, so don't try to run anything discovered from
+# introspecting the symbols (e.g. FunctionTestCase). Instead, all our
+# tests come from within unittest.test.
+def load_tests(loader, tests, pattern):
+ import os.path
+ # top level directory cached on loader instance
+ this_dir = os.path.dirname(__file__)
+ return loader.discover(start_dir=this_dir, pattern=pattern)
+
+
+# Lazy import of IsolatedAsyncioTestCase from .async_case
+# It imports asyncio, which is relatively heavy, but most tests
+# do not need it.
+
+def __dir__():
+ return globals().keys() | {'IsolatedAsyncioTestCase'}
+
+def __getattr__(name):
+ if name == 'IsolatedAsyncioTestCase':
+ global IsolatedAsyncioTestCase
+ from .async_case import IsolatedAsyncioTestCase
+ return IsolatedAsyncioTestCase
+ raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
diff --git a/lib/python3.10/unittest/__main__.py b/lib/python3.10/unittest/__main__.py
new file mode 100644
index 0000000..e5876f5
--- /dev/null
+++ b/lib/python3.10/unittest/__main__.py
@@ -0,0 +1,18 @@
+"""Main entry point"""
+
+import sys
+if sys.argv[0].endswith("__main__.py"):
+ import os.path
+ # We change sys.argv[0] to make help message more useful
+ # use executable without path, unquoted
+ # (it's just a hint anyway)
+ # (if you have spaces in your executable you get what you deserve!)
+ executable = os.path.basename(sys.executable)
+ sys.argv[0] = executable + " -m unittest"
+ del os
+
+__unittest = True
+
+from .main import main
+
+main(module=None)
diff --git a/lib/python3.10/unittest/_log.py b/lib/python3.10/unittest/_log.py
new file mode 100644
index 0000000..94868e5
--- /dev/null
+++ b/lib/python3.10/unittest/_log.py
@@ -0,0 +1,86 @@
+import logging
+import collections
+
+from .case import _BaseTestCaseContext
+
+
+_LoggingWatcher = collections.namedtuple("_LoggingWatcher",
+ ["records", "output"])
+
+class _CapturingHandler(logging.Handler):
+ """
+ A logging handler capturing all (raw and formatted) logging output.
+ """
+
+ def __init__(self):
+ logging.Handler.__init__(self)
+ self.watcher = _LoggingWatcher([], [])
+
+ def flush(self):
+ pass
+
+ def emit(self, record):
+ self.watcher.records.append(record)
+ msg = self.format(record)
+ self.watcher.output.append(msg)
+
+
+class _AssertLogsContext(_BaseTestCaseContext):
+ """A context manager for assertLogs() and assertNoLogs() """
+
+ LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s"
+
+ def __init__(self, test_case, logger_name, level, no_logs):
+ _BaseTestCaseContext.__init__(self, test_case)
+ self.logger_name = logger_name
+ if level:
+ self.level = logging._nameToLevel.get(level, level)
+ else:
+ self.level = logging.INFO
+ self.msg = None
+ self.no_logs = no_logs
+
+ def __enter__(self):
+ if isinstance(self.logger_name, logging.Logger):
+ logger = self.logger = self.logger_name
+ else:
+ logger = self.logger = logging.getLogger(self.logger_name)
+ formatter = logging.Formatter(self.LOGGING_FORMAT)
+ handler = _CapturingHandler()
+ handler.setLevel(self.level)
+ handler.setFormatter(formatter)
+ self.watcher = handler.watcher
+ self.old_handlers = logger.handlers[:]
+ self.old_level = logger.level
+ self.old_propagate = logger.propagate
+ logger.handlers = [handler]
+ logger.setLevel(self.level)
+ logger.propagate = False
+ if self.no_logs:
+ return
+ return handler.watcher
+
+ def __exit__(self, exc_type, exc_value, tb):
+ self.logger.handlers = self.old_handlers
+ self.logger.propagate = self.old_propagate
+ self.logger.setLevel(self.old_level)
+
+ if exc_type is not None:
+ # let unexpected exceptions pass through
+ return False
+
+ if self.no_logs:
+ # assertNoLogs
+ if len(self.watcher.records) > 0:
+ self._raiseFailure(
+ "Unexpected logs found: {!r}".format(
+ self.watcher.output
+ )
+ )
+
+ else:
+ # assertLogs
+ if len(self.watcher.records) == 0:
+ self._raiseFailure(
+ "no logs of level {} or higher triggered on {}"
+ .format(logging.getLevelName(self.level), self.logger.name))
diff --git a/lib/python3.10/unittest/async_case.py b/lib/python3.10/unittest/async_case.py
new file mode 100644
index 0000000..2323119
--- /dev/null
+++ b/lib/python3.10/unittest/async_case.py
@@ -0,0 +1,168 @@
+import asyncio
+import inspect
+
+from .case import TestCase
+
+
+class IsolatedAsyncioTestCase(TestCase):
+ # Names intentionally have a long prefix
+ # to reduce a chance of clashing with user-defined attributes
+ # from inherited test case
+ #
+ # The class doesn't call loop.run_until_complete(self.setUp()) and family
+ # but uses a different approach:
+ # 1. create a long-running task that reads self.setUp()
+ # awaitable from queue along with a future
+ # 2. await the awaitable object passing in and set the result
+ # into the future object
+ # 3. Outer code puts the awaitable and the future object into a queue
+ # with waiting for the future
+ # The trick is necessary because every run_until_complete() call
+ # creates a new task with embedded ContextVar context.
+ # To share contextvars between setUp(), test and tearDown() we need to execute
+ # them inside the same task.
+
+ # Note: the test case modifies event loop policy if the policy was not instantiated
+ # yet.
+ # asyncio.get_event_loop_policy() creates a default policy on demand but never
+ # returns None
+ # I believe this is not an issue in user level tests but python itself for testing
+ # should reset a policy in every test module
+ # by calling asyncio.set_event_loop_policy(None) in tearDownModule()
+
+ def __init__(self, methodName='runTest'):
+ super().__init__(methodName)
+ self._asyncioTestLoop = None
+ self._asyncioCallsQueue = None
+
+ async def asyncSetUp(self):
+ pass
+
+ async def asyncTearDown(self):
+ pass
+
+ def addAsyncCleanup(self, func, /, *args, **kwargs):
+ # A trivial trampoline to addCleanup()
+ # the function exists because it has a different semantics
+ # and signature:
+ # addCleanup() accepts regular functions
+ # but addAsyncCleanup() accepts coroutines
+ #
+ # We intentionally don't add inspect.iscoroutinefunction() check
+ # for func argument because there is no way
+ # to check for async function reliably:
+ # 1. It can be "async def func()" itself
+ # 2. Class can implement "async def __call__()" method
+ # 3. Regular "def func()" that returns awaitable object
+ self.addCleanup(*(func, *args), **kwargs)
+
+ def _callSetUp(self):
+ self.setUp()
+ self._callAsync(self.asyncSetUp)
+
+ def _callTestMethod(self, method):
+ self._callMaybeAsync(method)
+
+ def _callTearDown(self):
+ self._callAsync(self.asyncTearDown)
+ self.tearDown()
+
+ def _callCleanup(self, function, *args, **kwargs):
+ self._callMaybeAsync(function, *args, **kwargs)
+
+ def _callAsync(self, func, /, *args, **kwargs):
+ assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
+ ret = func(*args, **kwargs)
+ assert inspect.isawaitable(ret), f'{func!r} returned non-awaitable'
+ fut = self._asyncioTestLoop.create_future()
+ self._asyncioCallsQueue.put_nowait((fut, ret))
+ return self._asyncioTestLoop.run_until_complete(fut)
+
+ def _callMaybeAsync(self, func, /, *args, **kwargs):
+ assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
+ ret = func(*args, **kwargs)
+ if inspect.isawaitable(ret):
+ fut = self._asyncioTestLoop.create_future()
+ self._asyncioCallsQueue.put_nowait((fut, ret))
+ return self._asyncioTestLoop.run_until_complete(fut)
+ else:
+ return ret
+
+ async def _asyncioLoopRunner(self, fut):
+ self._asyncioCallsQueue = queue = asyncio.Queue()
+ fut.set_result(None)
+ while True:
+ query = await queue.get()
+ queue.task_done()
+ if query is None:
+ return
+ fut, awaitable = query
+ try:
+ ret = await awaitable
+ if not fut.cancelled():
+ fut.set_result(ret)
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except (BaseException, asyncio.CancelledError) as ex:
+ if not fut.cancelled():
+ fut.set_exception(ex)
+
+ def _setupAsyncioLoop(self):
+ assert self._asyncioTestLoop is None, 'asyncio test loop already initialized'
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ loop.set_debug(True)
+ self._asyncioTestLoop = loop
+ fut = loop.create_future()
+ self._asyncioCallsTask = loop.create_task(self._asyncioLoopRunner(fut))
+ loop.run_until_complete(fut)
+
+ def _tearDownAsyncioLoop(self):
+ assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
+ loop = self._asyncioTestLoop
+ self._asyncioTestLoop = None
+ self._asyncioCallsQueue.put_nowait(None)
+ loop.run_until_complete(self._asyncioCallsQueue.join())
+
+ try:
+ # cancel all tasks
+ to_cancel = asyncio.all_tasks(loop)
+ if not to_cancel:
+ return
+
+ for task in to_cancel:
+ task.cancel()
+
+ loop.run_until_complete(
+ asyncio.gather(*to_cancel, return_exceptions=True))
+
+ for task in to_cancel:
+ if task.cancelled():
+ continue
+ if task.exception() is not None:
+ loop.call_exception_handler({
+ 'message': 'unhandled exception during test shutdown',
+ 'exception': task.exception(),
+ 'task': task,
+ })
+ # shutdown asyncgens
+ loop.run_until_complete(loop.shutdown_asyncgens())
+ finally:
+ asyncio.set_event_loop(None)
+ loop.close()
+
+ def run(self, result=None):
+ self._setupAsyncioLoop()
+ try:
+ return super().run(result)
+ finally:
+ self._tearDownAsyncioLoop()
+
+ def debug(self):
+ self._setupAsyncioLoop()
+ super().debug()
+ self._tearDownAsyncioLoop()
+
+ def __del__(self):
+ if self._asyncioTestLoop is not None:
+ self._tearDownAsyncioLoop()
diff --git a/lib/python3.10/unittest/case.py b/lib/python3.10/unittest/case.py
new file mode 100644
index 0000000..61003d0
--- /dev/null
+++ b/lib/python3.10/unittest/case.py
@@ -0,0 +1,1452 @@
+"""Test case implementation"""
+
+import sys
+import functools
+import difflib
+import pprint
+import re
+import warnings
+import collections
+import contextlib
+import traceback
+import types
+
+from . import result
+from .util import (strclass, safe_repr, _count_diff_all_purpose,
+ _count_diff_hashable, _common_shorten_repr)
+
+__unittest = True
+
+_subtest_msg_sentinel = object()
+
+DIFF_OMITTED = ('\nDiff is %s characters long. '
+ 'Set self.maxDiff to None to see it.')
+
+class SkipTest(Exception):
+ """
+ Raise this exception in a test to skip it.
+
+ Usually you can use TestCase.skipTest() or one of the skipping decorators
+ instead of raising this directly.
+ """
+
+class _ShouldStop(Exception):
+ """
+ The test should stop.
+ """
+
+class _UnexpectedSuccess(Exception):
+ """
+ The test was supposed to fail, but it didn't!
+ """
+
+
+class _Outcome(object):
+ def __init__(self, result=None):
+ self.expecting_failure = False
+ self.result = result
+ self.result_supports_subtests = hasattr(result, "addSubTest")
+ self.success = True
+ self.skipped = []
+ self.expectedFailure = None
+ self.errors = []
+
+ @contextlib.contextmanager
+ def testPartExecutor(self, test_case, isTest=False):
+ old_success = self.success
+ self.success = True
+ try:
+ yield
+ except KeyboardInterrupt:
+ raise
+ except SkipTest as e:
+ self.success = False
+ self.skipped.append((test_case, str(e)))
+ except _ShouldStop:
+ pass
+ except:
+ exc_info = sys.exc_info()
+ if self.expecting_failure:
+ self.expectedFailure = exc_info
+ else:
+ self.success = False
+ self.errors.append((test_case, exc_info))
+ # explicitly break a reference cycle:
+ # exc_info -> frame -> exc_info
+ exc_info = None
+ else:
+ if self.result_supports_subtests and self.success:
+ self.errors.append((test_case, None))
+ finally:
+ self.success = self.success and old_success
+
+
+def _id(obj):
+ return obj
+
+
+_module_cleanups = []
+def addModuleCleanup(function, /, *args, **kwargs):
+ """Same as addCleanup, except the cleanup items are called even if
+ setUpModule fails (unlike tearDownModule)."""
+ _module_cleanups.append((function, args, kwargs))
+
+
+def doModuleCleanups():
+ """Execute all module cleanup functions. Normally called for you after
+ tearDownModule."""
+ exceptions = []
+ while _module_cleanups:
+ function, args, kwargs = _module_cleanups.pop()
+ try:
+ function(*args, **kwargs)
+ except Exception as exc:
+ exceptions.append(exc)
+ if exceptions:
+ # Swallows all but first exception. If a multi-exception handler
+ # gets written we should use that here instead.
+ raise exceptions[0]
+
+
+def skip(reason):
+ """
+ Unconditionally skip a test.
+ """
+ def decorator(test_item):
+ if not isinstance(test_item, type):
+ @functools.wraps(test_item)
+ def skip_wrapper(*args, **kwargs):
+ raise SkipTest(reason)
+ test_item = skip_wrapper
+
+ test_item.__unittest_skip__ = True
+ test_item.__unittest_skip_why__ = reason
+ return test_item
+ if isinstance(reason, types.FunctionType):
+ test_item = reason
+ reason = ''
+ return decorator(test_item)
+ return decorator
+
+def skipIf(condition, reason):
+ """
+ Skip a test if the condition is true.
+ """
+ if condition:
+ return skip(reason)
+ return _id
+
+def skipUnless(condition, reason):
+ """
+ Skip a test unless the condition is true.
+ """
+ if not condition:
+ return skip(reason)
+ return _id
+
+def expectedFailure(test_item):
+ test_item.__unittest_expecting_failure__ = True
+ return test_item
+
+def _is_subtype(expected, basetype):
+ if isinstance(expected, tuple):
+ return all(_is_subtype(e, basetype) for e in expected)
+ return isinstance(expected, type) and issubclass(expected, basetype)
+
+class _BaseTestCaseContext:
+
+ def __init__(self, test_case):
+ self.test_case = test_case
+
+ def _raiseFailure(self, standardMsg):
+ msg = self.test_case._formatMessage(self.msg, standardMsg)
+ raise self.test_case.failureException(msg)
+
+class _AssertRaisesBaseContext(_BaseTestCaseContext):
+
+ def __init__(self, expected, test_case, expected_regex=None):
+ _BaseTestCaseContext.__init__(self, test_case)
+ self.expected = expected
+ self.test_case = test_case
+ if expected_regex is not None:
+ expected_regex = re.compile(expected_regex)
+ self.expected_regex = expected_regex
+ self.obj_name = None
+ self.msg = None
+
+ def handle(self, name, args, kwargs):
+ """
+ If args is empty, assertRaises/Warns is being used as a
+ context manager, so check for a 'msg' kwarg and return self.
+ If args is not empty, call a callable passing positional and keyword
+ arguments.
+ """
+ try:
+ if not _is_subtype(self.expected, self._base_type):
+ raise TypeError('%s() arg 1 must be %s' %
+ (name, self._base_type_str))
+ if not args:
+ self.msg = kwargs.pop('msg', None)
+ if kwargs:
+ raise TypeError('%r is an invalid keyword argument for '
+ 'this function' % (next(iter(kwargs)),))
+ return self
+
+ callable_obj, *args = args
+ try:
+ self.obj_name = callable_obj.__name__
+ except AttributeError:
+ self.obj_name = str(callable_obj)
+ with self:
+ callable_obj(*args, **kwargs)
+ finally:
+ # bpo-23890: manually break a reference cycle
+ self = None
+
+
+class _AssertRaisesContext(_AssertRaisesBaseContext):
+ """A context manager used to implement TestCase.assertRaises* methods."""
+
+ _base_type = BaseException
+ _base_type_str = 'an exception type or tuple of exception types'
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ if exc_type is None:
+ try:
+ exc_name = self.expected.__name__
+ except AttributeError:
+ exc_name = str(self.expected)
+ if self.obj_name:
+ self._raiseFailure("{} not raised by {}".format(exc_name,
+ self.obj_name))
+ else:
+ self._raiseFailure("{} not raised".format(exc_name))
+ else:
+ traceback.clear_frames(tb)
+ if not issubclass(exc_type, self.expected):
+ # let unexpected exceptions pass through
+ return False
+ # store exception, without traceback, for later retrieval
+ self.exception = exc_value.with_traceback(None)
+ if self.expected_regex is None:
+ return True
+
+ expected_regex = self.expected_regex
+ if not expected_regex.search(str(exc_value)):
+ self._raiseFailure('"{}" does not match "{}"'.format(
+ expected_regex.pattern, str(exc_value)))
+ return True
+
+ __class_getitem__ = classmethod(types.GenericAlias)
+
+
+class _AssertWarnsContext(_AssertRaisesBaseContext):
+ """A context manager used to implement TestCase.assertWarns* methods."""
+
+ _base_type = Warning
+ _base_type_str = 'a warning type or tuple of warning types'
+
+ def __enter__(self):
+ # The __warningregistry__'s need to be in a pristine state for tests
+ # to work properly.
+ for v in list(sys.modules.values()):
+ if getattr(v, '__warningregistry__', None):
+ v.__warningregistry__ = {}
+ self.warnings_manager = warnings.catch_warnings(record=True)
+ self.warnings = self.warnings_manager.__enter__()
+ warnings.simplefilter("always", self.expected)
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ self.warnings_manager.__exit__(exc_type, exc_value, tb)
+ if exc_type is not None:
+ # let unexpected exceptions pass through
+ return
+ try:
+ exc_name = self.expected.__name__
+ except AttributeError:
+ exc_name = str(self.expected)
+ first_matching = None
+ for m in self.warnings:
+ w = m.message
+ if not isinstance(w, self.expected):
+ continue
+ if first_matching is None:
+ first_matching = w
+ if (self.expected_regex is not None and
+ not self.expected_regex.search(str(w))):
+ continue
+ # store warning for later retrieval
+ self.warning = w
+ self.filename = m.filename
+ self.lineno = m.lineno
+ return
+ # Now we simply try to choose a helpful failure message
+ if first_matching is not None:
+ self._raiseFailure('"{}" does not match "{}"'.format(
+ self.expected_regex.pattern, str(first_matching)))
+ if self.obj_name:
+ self._raiseFailure("{} not triggered by {}".format(exc_name,
+ self.obj_name))
+ else:
+ self._raiseFailure("{} not triggered".format(exc_name))
+
+
+class _OrderedChainMap(collections.ChainMap):
+ def __iter__(self):
+ seen = set()
+ for mapping in self.maps:
+ for k in mapping:
+ if k not in seen:
+ seen.add(k)
+ yield k
+
+
+class TestCase(object):
+ """A class whose instances are single test cases.
+
+ By default, the test code itself should be placed in a method named
+ 'runTest'.
+
+ If the fixture may be used for many test cases, create as
+ many test methods as are needed. When instantiating such a TestCase
+ subclass, specify in the constructor arguments the name of the test method
+ that the instance is to execute.
+
+ Test authors should subclass TestCase for their own tests. Construction
+ and deconstruction of the test's environment ('fixture') can be
+ implemented by overriding the 'setUp' and 'tearDown' methods respectively.
+
+ If it is necessary to override the __init__ method, the base class
+ __init__ method must always be called. It is important that subclasses
+ should not change the signature of their __init__ method, since instances
+ of the classes are instantiated automatically by parts of the framework
+ in order to be run.
+
+ When subclassing TestCase, you can set these attributes:
+ * failureException: determines which exception will be raised when
+ the instance's assertion methods fail; test methods raising this
+ exception will be deemed to have 'failed' rather than 'errored'.
+ * longMessage: determines whether long messages (including repr of
+ objects used in assert methods) will be printed on failure in *addition*
+ to any explicit message passed.
+ * maxDiff: sets the maximum length of a diff in failure messages
+ by assert methods using difflib. It is looked up as an instance
+ attribute so can be configured by individual tests if required.
+ """
+
+ failureException = AssertionError
+
+ longMessage = True
+
+ maxDiff = 80*8
+
+ # If a string is longer than _diffThreshold, use normal comparison instead
+ # of difflib. See #11763.
+ _diffThreshold = 2**16
+
+ # Attribute used by TestSuite for classSetUp
+
+ _classSetupFailed = False
+
+ _class_cleanups = []
+
+ def __init__(self, methodName='runTest'):
+ """Create an instance of the class that will use the named test
+ method when executed. Raises a ValueError if the instance does
+ not have a method with the specified name.
+ """
+ self._testMethodName = methodName
+ self._outcome = None
+ self._testMethodDoc = 'No test'
+ try:
+ testMethod = getattr(self, methodName)
+ except AttributeError:
+ if methodName != 'runTest':
+ # we allow instantiation with no explicit method name
+ # but not an *incorrect* or missing method name
+ raise ValueError("no such test method in %s: %s" %
+ (self.__class__, methodName))
+ else:
+ self._testMethodDoc = testMethod.__doc__
+ self._cleanups = []
+ self._subtest = None
+
+ # Map types to custom assertEqual functions that will compare
+ # instances of said type in more detail to generate a more useful
+ # error message.
+ self._type_equality_funcs = {}
+ self.addTypeEqualityFunc(dict, 'assertDictEqual')
+ self.addTypeEqualityFunc(list, 'assertListEqual')
+ self.addTypeEqualityFunc(tuple, 'assertTupleEqual')
+ self.addTypeEqualityFunc(set, 'assertSetEqual')
+ self.addTypeEqualityFunc(frozenset, 'assertSetEqual')
+ self.addTypeEqualityFunc(str, 'assertMultiLineEqual')
+
+ def addTypeEqualityFunc(self, typeobj, function):
+ """Add a type specific assertEqual style function to compare a type.
+
+ This method is for use by TestCase subclasses that need to register
+ their own type equality functions to provide nicer error messages.
+
+ Args:
+ typeobj: The data type to call this function on when both values
+ are of the same type in assertEqual().
+ function: The callable taking two arguments and an optional
+ msg= argument that raises self.failureException with a
+ useful error message when the two arguments are not equal.
+ """
+ self._type_equality_funcs[typeobj] = function
+
+ def addCleanup(self, function, /, *args, **kwargs):
+ """Add a function, with arguments, to be called when the test is
+ completed. Functions added are called on a LIFO basis and are
+ called after tearDown on test failure or success.
+
+ Cleanup items are called even if setUp fails (unlike tearDown)."""
+ self._cleanups.append((function, args, kwargs))
+
+ @classmethod
+ def addClassCleanup(cls, function, /, *args, **kwargs):
+ """Same as addCleanup, except the cleanup items are called even if
+ setUpClass fails (unlike tearDownClass)."""
+ cls._class_cleanups.append((function, args, kwargs))
+
+ def setUp(self):
+ "Hook method for setting up the test fixture before exercising it."
+ pass
+
+ def tearDown(self):
+ "Hook method for deconstructing the test fixture after testing it."
+ pass
+
+ @classmethod
+ def setUpClass(cls):
+ "Hook method for setting up class fixture before running tests in the class."
+
+ @classmethod
+ def tearDownClass(cls):
+ "Hook method for deconstructing the class fixture after running all tests in the class."
+
+ def countTestCases(self):
+ return 1
+
+ def defaultTestResult(self):
+ return result.TestResult()
+
+ def shortDescription(self):
+ """Returns a one-line description of the test, or None if no
+ description has been provided.
+
+ The default implementation of this method returns the first line of
+ the specified test method's docstring.
+ """
+ doc = self._testMethodDoc
+ return doc.strip().split("\n")[0].strip() if doc else None
+
+
+ def id(self):
+ return "%s.%s" % (strclass(self.__class__), self._testMethodName)
+
+ def __eq__(self, other):
+ if type(self) is not type(other):
+ return NotImplemented
+
+ return self._testMethodName == other._testMethodName
+
+ def __hash__(self):
+ return hash((type(self), self._testMethodName))
+
+ def __str__(self):
+ return "%s (%s)" % (self._testMethodName, strclass(self.__class__))
+
+ def __repr__(self):
+ return "<%s testMethod=%s>" % \
+ (strclass(self.__class__), self._testMethodName)
+
+ def _addSkip(self, result, test_case, reason):
+ addSkip = getattr(result, 'addSkip', None)
+ if addSkip is not None:
+ addSkip(test_case, reason)
+ else:
+ warnings.warn("TestResult has no addSkip method, skips not reported",
+ RuntimeWarning, 2)
+ result.addSuccess(test_case)
+
+ @contextlib.contextmanager
+ def subTest(self, msg=_subtest_msg_sentinel, **params):
+ """Return a context manager that will return the enclosed block
+ of code in a subtest identified by the optional message and
+ keyword parameters. A failure in the subtest marks the test
+ case as failed but resumes execution at the end of the enclosed
+ block, allowing further test code to be executed.
+ """
+ if self._outcome is None or not self._outcome.result_supports_subtests:
+ yield
+ return
+ parent = self._subtest
+ if parent is None:
+ params_map = _OrderedChainMap(params)
+ else:
+ params_map = parent.params.new_child(params)
+ self._subtest = _SubTest(self, msg, params_map)
+ try:
+ with self._outcome.testPartExecutor(self._subtest, isTest=True):
+ yield
+ if not self._outcome.success:
+ result = self._outcome.result
+ if result is not None and result.failfast:
+ raise _ShouldStop
+ elif self._outcome.expectedFailure:
+ # If the test is expecting a failure, we really want to
+ # stop now and register the expected failure.
+ raise _ShouldStop
+ finally:
+ self._subtest = parent
+
+ def _feedErrorsToResult(self, result, errors):
+ for test, exc_info in errors:
+ if isinstance(test, _SubTest):
+ result.addSubTest(test.test_case, test, exc_info)
+ elif exc_info is not None:
+ if issubclass(exc_info[0], self.failureException):
+ result.addFailure(test, exc_info)
+ else:
+ result.addError(test, exc_info)
+
+ def _addExpectedFailure(self, result, exc_info):
+ try:
+ addExpectedFailure = result.addExpectedFailure
+ except AttributeError:
+ warnings.warn("TestResult has no addExpectedFailure method, reporting as passes",
+ RuntimeWarning)
+ result.addSuccess(self)
+ else:
+ addExpectedFailure(self, exc_info)
+
+ def _addUnexpectedSuccess(self, result):
+ try:
+ addUnexpectedSuccess = result.addUnexpectedSuccess
+ except AttributeError:
+ warnings.warn("TestResult has no addUnexpectedSuccess method, reporting as failure",
+ RuntimeWarning)
+ # We need to pass an actual exception and traceback to addFailure,
+ # otherwise the legacy result can choke.
+ try:
+ raise _UnexpectedSuccess from None
+ except _UnexpectedSuccess:
+ result.addFailure(self, sys.exc_info())
+ else:
+ addUnexpectedSuccess(self)
+
+ def _callSetUp(self):
+ self.setUp()
+
+ def _callTestMethod(self, method):
+ method()
+
+ def _callTearDown(self):
+ self.tearDown()
+
+ def _callCleanup(self, function, /, *args, **kwargs):
+ function(*args, **kwargs)
+
+ def run(self, result=None):
+ if result is None:
+ result = self.defaultTestResult()
+ startTestRun = getattr(result, 'startTestRun', None)
+ stopTestRun = getattr(result, 'stopTestRun', None)
+ if startTestRun is not None:
+ startTestRun()
+ else:
+ stopTestRun = None
+
+ result.startTest(self)
+ try:
+ testMethod = getattr(self, self._testMethodName)
+ if (getattr(self.__class__, "__unittest_skip__", False) or
+ getattr(testMethod, "__unittest_skip__", False)):
+ # If the class or method was skipped.
+ skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
+ or getattr(testMethod, '__unittest_skip_why__', ''))
+ self._addSkip(result, self, skip_why)
+ return result
+
+ expecting_failure = (
+ getattr(self, "__unittest_expecting_failure__", False) or
+ getattr(testMethod, "__unittest_expecting_failure__", False)
+ )
+ outcome = _Outcome(result)
+ try:
+ self._outcome = outcome
+
+ with outcome.testPartExecutor(self):
+ self._callSetUp()
+ if outcome.success:
+ outcome.expecting_failure = expecting_failure
+ with outcome.testPartExecutor(self, isTest=True):
+ self._callTestMethod(testMethod)
+ outcome.expecting_failure = False
+ with outcome.testPartExecutor(self):
+ self._callTearDown()
+
+ self.doCleanups()
+ for test, reason in outcome.skipped:
+ self._addSkip(result, test, reason)
+ self._feedErrorsToResult(result, outcome.errors)
+ if outcome.success:
+ if expecting_failure:
+ if outcome.expectedFailure:
+ self._addExpectedFailure(result, outcome.expectedFailure)
+ else:
+ self._addUnexpectedSuccess(result)
+ else:
+ result.addSuccess(self)
+ return result
+ finally:
+ # explicitly break reference cycles:
+ # outcome.errors -> frame -> outcome -> outcome.errors
+ # outcome.expectedFailure -> frame -> outcome -> outcome.expectedFailure
+ outcome.errors.clear()
+ outcome.expectedFailure = None
+
+ # clear the outcome, no more needed
+ self._outcome = None
+
+ finally:
+ result.stopTest(self)
+ if stopTestRun is not None:
+ stopTestRun()
+
+ def doCleanups(self):
+ """Execute all cleanup functions. Normally called for you after
+ tearDown."""
+ outcome = self._outcome or _Outcome()
+ while self._cleanups:
+ function, args, kwargs = self._cleanups.pop()
+ with outcome.testPartExecutor(self):
+ self._callCleanup(function, *args, **kwargs)
+
+ # return this for backwards compatibility
+ # even though we no longer use it internally
+ return outcome.success
+
+ @classmethod
+ def doClassCleanups(cls):
+ """Execute all class cleanup functions. Normally called for you after
+ tearDownClass."""
+ cls.tearDown_exceptions = []
+ while cls._class_cleanups:
+ function, args, kwargs = cls._class_cleanups.pop()
+ try:
+ function(*args, **kwargs)
+ except Exception:
+ cls.tearDown_exceptions.append(sys.exc_info())
+
+ def __call__(self, *args, **kwds):
+ return self.run(*args, **kwds)
+
+ def debug(self):
+ """Run the test without collecting errors in a TestResult"""
+ testMethod = getattr(self, self._testMethodName)
+ if (getattr(self.__class__, "__unittest_skip__", False) or
+ getattr(testMethod, "__unittest_skip__", False)):
+ # If the class or method was skipped.
+ skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
+ or getattr(testMethod, '__unittest_skip_why__', ''))
+ raise SkipTest(skip_why)
+
+ self._callSetUp()
+ self._callTestMethod(testMethod)
+ self._callTearDown()
+ while self._cleanups:
+ function, args, kwargs = self._cleanups.pop()
+ self._callCleanup(function, *args, **kwargs)
+
+ def skipTest(self, reason):
+ """Skip this test."""
+ raise SkipTest(reason)
+
+ def fail(self, msg=None):
+ """Fail immediately, with the given message."""
+ raise self.failureException(msg)
+
+ def assertFalse(self, expr, msg=None):
+ """Check that the expression is false."""
+ if expr:
+ msg = self._formatMessage(msg, "%s is not false" % safe_repr(expr))
+ raise self.failureException(msg)
+
+ def assertTrue(self, expr, msg=None):
+ """Check that the expression is true."""
+ if not expr:
+ msg = self._formatMessage(msg, "%s is not true" % safe_repr(expr))
+ raise self.failureException(msg)
+
+ def _formatMessage(self, msg, standardMsg):
+ """Honour the longMessage attribute when generating failure messages.
+ If longMessage is False this means:
+ * Use only an explicit message if it is provided
+ * Otherwise use the standard message for the assert
+
+ If longMessage is True:
+ * Use the standard message
+ * If an explicit message is provided, plus ' : ' and the explicit message
+ """
+ if not self.longMessage:
+ return msg or standardMsg
+ if msg is None:
+ return standardMsg
+ try:
+ # don't switch to '{}' formatting in Python 2.X
+ # it changes the way unicode input is handled
+ return '%s : %s' % (standardMsg, msg)
+ except UnicodeDecodeError:
+ return '%s : %s' % (safe_repr(standardMsg), safe_repr(msg))
+
+ def assertRaises(self, expected_exception, *args, **kwargs):
+ """Fail unless an exception of class expected_exception is raised
+ by the callable when invoked with specified positional and
+ keyword arguments. If a different type of exception is
+ raised, it will not be caught, and the test case will be
+ deemed to have suffered an error, exactly as for an
+ unexpected exception.
+
+ If called with the callable and arguments omitted, will return a
+ context object used like this::
+
+ with self.assertRaises(SomeException):
+ do_something()
+
+ An optional keyword argument 'msg' can be provided when assertRaises
+ is used as a context object.
+
+ The context manager keeps a reference to the exception as
+ the 'exception' attribute. This allows you to inspect the
+ exception after the assertion::
+
+ with self.assertRaises(SomeException) as cm:
+ do_something()
+ the_exception = cm.exception
+ self.assertEqual(the_exception.error_code, 3)
+ """
+ context = _AssertRaisesContext(expected_exception, self)
+ try:
+ return context.handle('assertRaises', args, kwargs)
+ finally:
+ # bpo-23890: manually break a reference cycle
+ context = None
+
+ def assertWarns(self, expected_warning, *args, **kwargs):
+ """Fail unless a warning of class warnClass is triggered
+ by the callable when invoked with specified positional and
+ keyword arguments. If a different type of warning is
+ triggered, it will not be handled: depending on the other
+ warning filtering rules in effect, it might be silenced, printed
+ out, or raised as an exception.
+
+ If called with the callable and arguments omitted, will return a
+ context object used like this::
+
+ with self.assertWarns(SomeWarning):
+ do_something()
+
+ An optional keyword argument 'msg' can be provided when assertWarns
+ is used as a context object.
+
+ The context manager keeps a reference to the first matching
+ warning as the 'warning' attribute; similarly, the 'filename'
+ and 'lineno' attributes give you information about the line
+ of Python code from which the warning was triggered.
+ This allows you to inspect the warning after the assertion::
+
+ with self.assertWarns(SomeWarning) as cm:
+ do_something()
+ the_warning = cm.warning
+ self.assertEqual(the_warning.some_attribute, 147)
+ """
+ context = _AssertWarnsContext(expected_warning, self)
+ return context.handle('assertWarns', args, kwargs)
+
+ def assertLogs(self, logger=None, level=None):
+ """Fail unless a log message of level *level* or higher is emitted
+ on *logger_name* or its children. If omitted, *level* defaults to
+ INFO and *logger* defaults to the root logger.
+
+ This method must be used as a context manager, and will yield
+ a recording object with two attributes: `output` and `records`.
+ At the end of the context manager, the `output` attribute will
+ be a list of the matching formatted log messages and the
+ `records` attribute will be a list of the corresponding LogRecord
+ objects.
+
+ Example::
+
+ with self.assertLogs('foo', level='INFO') as cm:
+ logging.getLogger('foo').info('first message')
+ logging.getLogger('foo.bar').error('second message')
+ self.assertEqual(cm.output, ['INFO:foo:first message',
+ 'ERROR:foo.bar:second message'])
+ """
+ # Lazy import to avoid importing logging if it is not needed.
+ from ._log import _AssertLogsContext
+ return _AssertLogsContext(self, logger, level, no_logs=False)
+
+ def assertNoLogs(self, logger=None, level=None):
+ """ Fail unless no log messages of level *level* or higher are emitted
+ on *logger_name* or its children.
+
+ This method must be used as a context manager.
+ """
+ from ._log import _AssertLogsContext
+ return _AssertLogsContext(self, logger, level, no_logs=True)
+
+ def _getAssertEqualityFunc(self, first, second):
+ """Get a detailed comparison function for the types of the two args.
+
+ Returns: A callable accepting (first, second, msg=None) that will
+ raise a failure exception if first != second with a useful human
+ readable error message for those types.
+ """
+ #
+ # NOTE(gregory.p.smith): I considered isinstance(first, type(second))
+ # and vice versa. I opted for the conservative approach in case
+ # subclasses are not intended to be compared in detail to their super
+ # class instances using a type equality func. This means testing
+ # subtypes won't automagically use the detailed comparison. Callers
+ # should use their type specific assertSpamEqual method to compare
+ # subclasses if the detailed comparison is desired and appropriate.
+ # See the discussion in http://bugs.python.org/issue2578.
+ #
+ if type(first) is type(second):
+ asserter = self._type_equality_funcs.get(type(first))
+ if asserter is not None:
+ if isinstance(asserter, str):
+ asserter = getattr(self, asserter)
+ return asserter
+
+ return self._baseAssertEqual
+
+ def _baseAssertEqual(self, first, second, msg=None):
+ """The default assertEqual implementation, not type specific."""
+ if not first == second:
+ standardMsg = '%s != %s' % _common_shorten_repr(first, second)
+ msg = self._formatMessage(msg, standardMsg)
+ raise self.failureException(msg)
+
+ def assertEqual(self, first, second, msg=None):
+ """Fail if the two objects are unequal as determined by the '=='
+ operator.
+ """
+ assertion_func = self._getAssertEqualityFunc(first, second)
+ assertion_func(first, second, msg=msg)
+
+ def assertNotEqual(self, first, second, msg=None):
+ """Fail if the two objects are equal as determined by the '!='
+ operator.
+ """
+ if not first != second:
+ msg = self._formatMessage(msg, '%s == %s' % (safe_repr(first),
+ safe_repr(second)))
+ raise self.failureException(msg)
+
+ def assertAlmostEqual(self, first, second, places=None, msg=None,
+ delta=None):
+ """Fail if the two objects are unequal as determined by their
+ difference rounded to the given number of decimal places
+ (default 7) and comparing to zero, or by comparing that the
+ difference between the two objects is more than the given
+ delta.
+
+ Note that decimal places (from zero) are usually not the same
+ as significant digits (measured from the most significant digit).
+
+ If the two objects compare equal then they will automatically
+ compare almost equal.
+ """
+ if first == second:
+ # shortcut
+ return
+ if delta is not None and places is not None:
+ raise TypeError("specify delta or places not both")
+
+ diff = abs(first - second)
+ if delta is not None:
+ if diff <= delta:
+ return
+
+ standardMsg = '%s != %s within %s delta (%s difference)' % (
+ safe_repr(first),
+ safe_repr(second),
+ safe_repr(delta),
+ safe_repr(diff))
+ else:
+ if places is None:
+ places = 7
+
+ if round(diff, places) == 0:
+ return
+
+ standardMsg = '%s != %s within %r places (%s difference)' % (
+ safe_repr(first),
+ safe_repr(second),
+ places,
+ safe_repr(diff))
+ msg = self._formatMessage(msg, standardMsg)
+ raise self.failureException(msg)
+
+ def assertNotAlmostEqual(self, first, second, places=None, msg=None,
+ delta=None):
+ """Fail if the two objects are equal as determined by their
+ difference rounded to the given number of decimal places
+ (default 7) and comparing to zero, or by comparing that the
+ difference between the two objects is less than the given delta.
+
+ Note that decimal places (from zero) are usually not the same
+ as significant digits (measured from the most significant digit).
+
+ Objects that are equal automatically fail.
+ """
+ if delta is not None and places is not None:
+ raise TypeError("specify delta or places not both")
+ diff = abs(first - second)
+ if delta is not None:
+ if not (first == second) and diff > delta:
+ return
+ standardMsg = '%s == %s within %s delta (%s difference)' % (
+ safe_repr(first),
+ safe_repr(second),
+ safe_repr(delta),
+ safe_repr(diff))
+ else:
+ if places is None:
+ places = 7
+ if not (first == second) and round(diff, places) != 0:
+ return
+ standardMsg = '%s == %s within %r places' % (safe_repr(first),
+ safe_repr(second),
+ places)
+
+ msg = self._formatMessage(msg, standardMsg)
+ raise self.failureException(msg)
+
+ def assertSequenceEqual(self, seq1, seq2, msg=None, seq_type=None):
+ """An equality assertion for ordered sequences (like lists and tuples).
+
+ For the purposes of this function, a valid ordered sequence type is one
+ which can be indexed, has a length, and has an equality operator.
+
+ Args:
+ seq1: The first sequence to compare.
+ seq2: The second sequence to compare.
+ seq_type: The expected datatype of the sequences, or None if no
+ datatype should be enforced.
+ msg: Optional message to use on failure instead of a list of
+ differences.
+ """
+ if seq_type is not None:
+ seq_type_name = seq_type.__name__
+ if not isinstance(seq1, seq_type):
+ raise self.failureException('First sequence is not a %s: %s'
+ % (seq_type_name, safe_repr(seq1)))
+ if not isinstance(seq2, seq_type):
+ raise self.failureException('Second sequence is not a %s: %s'
+ % (seq_type_name, safe_repr(seq2)))
+ else:
+ seq_type_name = "sequence"
+
+ differing = None
+ try:
+ len1 = len(seq1)
+ except (TypeError, NotImplementedError):
+ differing = 'First %s has no length. Non-sequence?' % (
+ seq_type_name)
+
+ if differing is None:
+ try:
+ len2 = len(seq2)
+ except (TypeError, NotImplementedError):
+ differing = 'Second %s has no length. Non-sequence?' % (
+ seq_type_name)
+
+ if differing is None:
+ if seq1 == seq2:
+ return
+
+ differing = '%ss differ: %s != %s\n' % (
+ (seq_type_name.capitalize(),) +
+ _common_shorten_repr(seq1, seq2))
+
+ for i in range(min(len1, len2)):
+ try:
+ item1 = seq1[i]
+ except (TypeError, IndexError, NotImplementedError):
+ differing += ('\nUnable to index element %d of first %s\n' %
+ (i, seq_type_name))
+ break
+
+ try:
+ item2 = seq2[i]
+ except (TypeError, IndexError, NotImplementedError):
+ differing += ('\nUnable to index element %d of second %s\n' %
+ (i, seq_type_name))
+ break
+
+ if item1 != item2:
+ differing += ('\nFirst differing element %d:\n%s\n%s\n' %
+ ((i,) + _common_shorten_repr(item1, item2)))
+ break
+ else:
+ if (len1 == len2 and seq_type is None and
+ type(seq1) != type(seq2)):
+ # The sequences are the same, but have differing types.
+ return
+
+ if len1 > len2:
+ differing += ('\nFirst %s contains %d additional '
+ 'elements.\n' % (seq_type_name, len1 - len2))
+ try:
+ differing += ('First extra element %d:\n%s\n' %
+ (len2, safe_repr(seq1[len2])))
+ except (TypeError, IndexError, NotImplementedError):
+ differing += ('Unable to index element %d '
+ 'of first %s\n' % (len2, seq_type_name))
+ elif len1 < len2:
+ differing += ('\nSecond %s contains %d additional '
+ 'elements.\n' % (seq_type_name, len2 - len1))
+ try:
+ differing += ('First extra element %d:\n%s\n' %
+ (len1, safe_repr(seq2[len1])))
+ except (TypeError, IndexError, NotImplementedError):
+ differing += ('Unable to index element %d '
+ 'of second %s\n' % (len1, seq_type_name))
+ standardMsg = differing
+ diffMsg = '\n' + '\n'.join(
+ difflib.ndiff(pprint.pformat(seq1).splitlines(),
+ pprint.pformat(seq2).splitlines()))
+
+ standardMsg = self._truncateMessage(standardMsg, diffMsg)
+ msg = self._formatMessage(msg, standardMsg)
+ self.fail(msg)
+
+ def _truncateMessage(self, message, diff):
+ max_diff = self.maxDiff
+ if max_diff is None or len(diff) <= max_diff:
+ return message + diff
+ return message + (DIFF_OMITTED % len(diff))
+
+ def assertListEqual(self, list1, list2, msg=None):
+ """A list-specific equality assertion.
+
+ Args:
+ list1: The first list to compare.
+ list2: The second list to compare.
+ msg: Optional message to use on failure instead of a list of
+ differences.
+
+ """
+ self.assertSequenceEqual(list1, list2, msg, seq_type=list)
+
+ def assertTupleEqual(self, tuple1, tuple2, msg=None):
+ """A tuple-specific equality assertion.
+
+ Args:
+ tuple1: The first tuple to compare.
+ tuple2: The second tuple to compare.
+ msg: Optional message to use on failure instead of a list of
+ differences.
+ """
+ self.assertSequenceEqual(tuple1, tuple2, msg, seq_type=tuple)
+
+ def assertSetEqual(self, set1, set2, msg=None):
+ """A set-specific equality assertion.
+
+ Args:
+ set1: The first set to compare.
+ set2: The second set to compare.
+ msg: Optional message to use on failure instead of a list of
+ differences.
+
+ assertSetEqual uses ducktyping to support different types of sets, and
+ is optimized for sets specifically (parameters must support a
+ difference method).
+ """
+ try:
+ difference1 = set1.difference(set2)
+ except TypeError as e:
+ self.fail('invalid type when attempting set difference: %s' % e)
+ except AttributeError as e:
+ self.fail('first argument does not support set difference: %s' % e)
+
+ try:
+ difference2 = set2.difference(set1)
+ except TypeError as e:
+ self.fail('invalid type when attempting set difference: %s' % e)
+ except AttributeError as e:
+ self.fail('second argument does not support set difference: %s' % e)
+
+ if not (difference1 or difference2):
+ return
+
+ lines = []
+ if difference1:
+ lines.append('Items in the first set but not the second:')
+ for item in difference1:
+ lines.append(repr(item))
+ if difference2:
+ lines.append('Items in the second set but not the first:')
+ for item in difference2:
+ lines.append(repr(item))
+
+ standardMsg = '\n'.join(lines)
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertIn(self, member, container, msg=None):
+ """Just like self.assertTrue(a in b), but with a nicer default message."""
+ if member not in container:
+ standardMsg = '%s not found in %s' % (safe_repr(member),
+ safe_repr(container))
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertNotIn(self, member, container, msg=None):
+ """Just like self.assertTrue(a not in b), but with a nicer default message."""
+ if member in container:
+ standardMsg = '%s unexpectedly found in %s' % (safe_repr(member),
+ safe_repr(container))
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertIs(self, expr1, expr2, msg=None):
+ """Just like self.assertTrue(a is b), but with a nicer default message."""
+ if expr1 is not expr2:
+ standardMsg = '%s is not %s' % (safe_repr(expr1),
+ safe_repr(expr2))
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertIsNot(self, expr1, expr2, msg=None):
+ """Just like self.assertTrue(a is not b), but with a nicer default message."""
+ if expr1 is expr2:
+ standardMsg = 'unexpectedly identical: %s' % (safe_repr(expr1),)
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertDictEqual(self, d1, d2, msg=None):
+ self.assertIsInstance(d1, dict, 'First argument is not a dictionary')
+ self.assertIsInstance(d2, dict, 'Second argument is not a dictionary')
+
+ if d1 != d2:
+ standardMsg = '%s != %s' % _common_shorten_repr(d1, d2)
+ diff = ('\n' + '\n'.join(difflib.ndiff(
+ pprint.pformat(d1).splitlines(),
+ pprint.pformat(d2).splitlines())))
+ standardMsg = self._truncateMessage(standardMsg, diff)
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertDictContainsSubset(self, subset, dictionary, msg=None):
+ """Checks whether dictionary is a superset of subset."""
+ warnings.warn('assertDictContainsSubset is deprecated',
+ DeprecationWarning,
+ stacklevel=2)
+ missing = []
+ mismatched = []
+ for key, value in subset.items():
+ if key not in dictionary:
+ missing.append(key)
+ elif value != dictionary[key]:
+ mismatched.append('%s, expected: %s, actual: %s' %
+ (safe_repr(key), safe_repr(value),
+ safe_repr(dictionary[key])))
+
+ if not (missing or mismatched):
+ return
+
+ standardMsg = ''
+ if missing:
+ standardMsg = 'Missing: %s' % ','.join(safe_repr(m) for m in
+ missing)
+ if mismatched:
+ if standardMsg:
+ standardMsg += '; '
+ standardMsg += 'Mismatched values: %s' % ','.join(mismatched)
+
+ self.fail(self._formatMessage(msg, standardMsg))
+
+
+ def assertCountEqual(self, first, second, msg=None):
+ """Asserts that two iterables have the same elements, the same number of
+ times, without regard to order.
+
+ self.assertEqual(Counter(list(first)),
+ Counter(list(second)))
+
+ Example:
+ - [0, 1, 1] and [1, 0, 1] compare equal.
+ - [0, 0, 1] and [0, 1] compare unequal.
+
+ """
+ first_seq, second_seq = list(first), list(second)
+ try:
+ first = collections.Counter(first_seq)
+ second = collections.Counter(second_seq)
+ except TypeError:
+ # Handle case with unhashable elements
+ differences = _count_diff_all_purpose(first_seq, second_seq)
+ else:
+ if first == second:
+ return
+ differences = _count_diff_hashable(first_seq, second_seq)
+
+ if differences:
+ standardMsg = 'Element counts were not equal:\n'
+ lines = ['First has %d, Second has %d: %r' % diff for diff in differences]
+ diffMsg = '\n'.join(lines)
+ standardMsg = self._truncateMessage(standardMsg, diffMsg)
+ msg = self._formatMessage(msg, standardMsg)
+ self.fail(msg)
+
+ def assertMultiLineEqual(self, first, second, msg=None):
+ """Assert that two multi-line strings are equal."""
+ self.assertIsInstance(first, str, 'First argument is not a string')
+ self.assertIsInstance(second, str, 'Second argument is not a string')
+
+ if first != second:
+ # don't use difflib if the strings are too long
+ if (len(first) > self._diffThreshold or
+ len(second) > self._diffThreshold):
+ self._baseAssertEqual(first, second, msg)
+ firstlines = first.splitlines(keepends=True)
+ secondlines = second.splitlines(keepends=True)
+ if len(firstlines) == 1 and first.strip('\r\n') == first:
+ firstlines = [first + '\n']
+ secondlines = [second + '\n']
+ standardMsg = '%s != %s' % _common_shorten_repr(first, second)
+ diff = '\n' + ''.join(difflib.ndiff(firstlines, secondlines))
+ standardMsg = self._truncateMessage(standardMsg, diff)
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertLess(self, a, b, msg=None):
+ """Just like self.assertTrue(a < b), but with a nicer default message."""
+ if not a < b:
+ standardMsg = '%s not less than %s' % (safe_repr(a), safe_repr(b))
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertLessEqual(self, a, b, msg=None):
+ """Just like self.assertTrue(a <= b), but with a nicer default message."""
+ if not a <= b:
+ standardMsg = '%s not less than or equal to %s' % (safe_repr(a), safe_repr(b))
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertGreater(self, a, b, msg=None):
+ """Just like self.assertTrue(a > b), but with a nicer default message."""
+ if not a > b:
+ standardMsg = '%s not greater than %s' % (safe_repr(a), safe_repr(b))
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertGreaterEqual(self, a, b, msg=None):
+ """Just like self.assertTrue(a >= b), but with a nicer default message."""
+ if not a >= b:
+ standardMsg = '%s not greater than or equal to %s' % (safe_repr(a), safe_repr(b))
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertIsNone(self, obj, msg=None):
+ """Same as self.assertTrue(obj is None), with a nicer default message."""
+ if obj is not None:
+ standardMsg = '%s is not None' % (safe_repr(obj),)
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertIsNotNone(self, obj, msg=None):
+ """Included for symmetry with assertIsNone."""
+ if obj is None:
+ standardMsg = 'unexpectedly None'
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertIsInstance(self, obj, cls, msg=None):
+ """Same as self.assertTrue(isinstance(obj, cls)), with a nicer
+ default message."""
+ if not isinstance(obj, cls):
+ standardMsg = '%s is not an instance of %r' % (safe_repr(obj), cls)
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertNotIsInstance(self, obj, cls, msg=None):
+ """Included for symmetry with assertIsInstance."""
+ if isinstance(obj, cls):
+ standardMsg = '%s is an instance of %r' % (safe_repr(obj), cls)
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertRaisesRegex(self, expected_exception, expected_regex,
+ *args, **kwargs):
+ """Asserts that the message in a raised exception matches a regex.
+
+ Args:
+ expected_exception: Exception class expected to be raised.
+ expected_regex: Regex (re.Pattern object or string) expected
+ to be found in error message.
+ args: Function to be called and extra positional args.
+ kwargs: Extra kwargs.
+ msg: Optional message used in case of failure. Can only be used
+ when assertRaisesRegex is used as a context manager.
+ """
+ context = _AssertRaisesContext(expected_exception, self, expected_regex)
+ return context.handle('assertRaisesRegex', args, kwargs)
+
+ def assertWarnsRegex(self, expected_warning, expected_regex,
+ *args, **kwargs):
+ """Asserts that the message in a triggered warning matches a regexp.
+ Basic functioning is similar to assertWarns() with the addition
+ that only warnings whose messages also match the regular expression
+ are considered successful matches.
+
+ Args:
+ expected_warning: Warning class expected to be triggered.
+ expected_regex: Regex (re.Pattern object or string) expected
+ to be found in error message.
+ args: Function to be called and extra positional args.
+ kwargs: Extra kwargs.
+ msg: Optional message used in case of failure. Can only be used
+ when assertWarnsRegex is used as a context manager.
+ """
+ context = _AssertWarnsContext(expected_warning, self, expected_regex)
+ return context.handle('assertWarnsRegex', args, kwargs)
+
+ def assertRegex(self, text, expected_regex, msg=None):
+ """Fail the test unless the text matches the regular expression."""
+ if isinstance(expected_regex, (str, bytes)):
+ assert expected_regex, "expected_regex must not be empty."
+ expected_regex = re.compile(expected_regex)
+ if not expected_regex.search(text):
+ standardMsg = "Regex didn't match: %r not found in %r" % (
+ expected_regex.pattern, text)
+ # _formatMessage ensures the longMessage option is respected
+ msg = self._formatMessage(msg, standardMsg)
+ raise self.failureException(msg)
+
+ def assertNotRegex(self, text, unexpected_regex, msg=None):
+ """Fail the test if the text matches the regular expression."""
+ if isinstance(unexpected_regex, (str, bytes)):
+ unexpected_regex = re.compile(unexpected_regex)
+ match = unexpected_regex.search(text)
+ if match:
+ standardMsg = 'Regex matched: %r matches %r in %r' % (
+ text[match.start() : match.end()],
+ unexpected_regex.pattern,
+ text)
+ # _formatMessage ensures the longMessage option is respected
+ msg = self._formatMessage(msg, standardMsg)
+ raise self.failureException(msg)
+
+
+ def _deprecate(original_func):
+ def deprecated_func(*args, **kwargs):
+ warnings.warn(
+ 'Please use {0} instead.'.format(original_func.__name__),
+ DeprecationWarning, 2)
+ return original_func(*args, **kwargs)
+ return deprecated_func
+
+ # see #9424
+ failUnlessEqual = assertEquals = _deprecate(assertEqual)
+ failIfEqual = assertNotEquals = _deprecate(assertNotEqual)
+ failUnlessAlmostEqual = assertAlmostEquals = _deprecate(assertAlmostEqual)
+ failIfAlmostEqual = assertNotAlmostEquals = _deprecate(assertNotAlmostEqual)
+ failUnless = assert_ = _deprecate(assertTrue)
+ failUnlessRaises = _deprecate(assertRaises)
+ failIf = _deprecate(assertFalse)
+ assertRaisesRegexp = _deprecate(assertRaisesRegex)
+ assertRegexpMatches = _deprecate(assertRegex)
+ assertNotRegexpMatches = _deprecate(assertNotRegex)
+
+
+
+class FunctionTestCase(TestCase):
+ """A test case that wraps a test function.
+
+ This is useful for slipping pre-existing test functions into the
+ unittest framework. Optionally, set-up and tidy-up functions can be
+ supplied. As with TestCase, the tidy-up ('tearDown') function will
+ always be called if the set-up ('setUp') function ran successfully.
+ """
+
+ def __init__(self, testFunc, setUp=None, tearDown=None, description=None):
+ super(FunctionTestCase, self).__init__()
+ self._setUpFunc = setUp
+ self._tearDownFunc = tearDown
+ self._testFunc = testFunc
+ self._description = description
+
+ def setUp(self):
+ if self._setUpFunc is not None:
+ self._setUpFunc()
+
+ def tearDown(self):
+ if self._tearDownFunc is not None:
+ self._tearDownFunc()
+
+ def runTest(self):
+ self._testFunc()
+
+ def id(self):
+ return self._testFunc.__name__
+
+ def __eq__(self, other):
+ if not isinstance(other, self.__class__):
+ return NotImplemented
+
+ return self._setUpFunc == other._setUpFunc and \
+ self._tearDownFunc == other._tearDownFunc and \
+ self._testFunc == other._testFunc and \
+ self._description == other._description
+
+ def __hash__(self):
+ return hash((type(self), self._setUpFunc, self._tearDownFunc,
+ self._testFunc, self._description))
+
+ def __str__(self):
+ return "%s (%s)" % (strclass(self.__class__),
+ self._testFunc.__name__)
+
+ def __repr__(self):
+ return "<%s tec=%s>" % (strclass(self.__class__),
+ self._testFunc)
+
+ def shortDescription(self):
+ if self._description is not None:
+ return self._description
+ doc = self._testFunc.__doc__
+ return doc and doc.split("\n")[0].strip() or None
+
+
+class _SubTest(TestCase):
+
+ def __init__(self, test_case, message, params):
+ super().__init__()
+ self._message = message
+ self.test_case = test_case
+ self.params = params
+ self.failureException = test_case.failureException
+
+ def runTest(self):
+ raise NotImplementedError("subtests cannot be run directly")
+
+ def _subDescription(self):
+ parts = []
+ if self._message is not _subtest_msg_sentinel:
+ parts.append("[{}]".format(self._message))
+ if self.params:
+ params_desc = ', '.join(
+ "{}={!r}".format(k, v)
+ for (k, v) in self.params.items())
+ parts.append("({})".format(params_desc))
+ return " ".join(parts) or '(<subtest>)'
+
+ def id(self):
+ return "{} {}".format(self.test_case.id(), self._subDescription())
+
+ def shortDescription(self):
+ """Returns a one-line description of the subtest, or None if no
+ description has been provided.
+ """
+ return self.test_case.shortDescription()
+
+ def __str__(self):
+ return "{} {}".format(self.test_case, self._subDescription())
diff --git a/lib/python3.10/unittest/loader.py b/lib/python3.10/unittest/loader.py
new file mode 100644
index 0000000..ba7105e
--- /dev/null
+++ b/lib/python3.10/unittest/loader.py
@@ -0,0 +1,517 @@
+"""Loading unittests."""
+
+import os
+import re
+import sys
+import traceback
+import types
+import functools
+import warnings
+
+from fnmatch import fnmatch, fnmatchcase
+
+from . import case, suite, util
+
+__unittest = True
+
+# what about .pyc (etc)
+# we would need to avoid loading the same tests multiple times
+# from '.py', *and* '.pyc'
+VALID_MODULE_NAME = re.compile(r'[_a-z]\w*\.py$', re.IGNORECASE)
+
+
+class _FailedTest(case.TestCase):
+ _testMethodName = None
+
+ def __init__(self, method_name, exception):
+ self._exception = exception
+ super(_FailedTest, self).__init__(method_name)
+
+ def __getattr__(self, name):
+ if name != self._testMethodName:
+ return super(_FailedTest, self).__getattr__(name)
+ def testFailure():
+ raise self._exception
+ return testFailure
+
+
+def _make_failed_import_test(name, suiteClass):
+ message = 'Failed to import test module: %s\n%s' % (
+ name, traceback.format_exc())
+ return _make_failed_test(name, ImportError(message), suiteClass, message)
+
+def _make_failed_load_tests(name, exception, suiteClass):
+ message = 'Failed to call load_tests:\n%s' % (traceback.format_exc(),)
+ return _make_failed_test(
+ name, exception, suiteClass, message)
+
+def _make_failed_test(methodname, exception, suiteClass, message):
+ test = _FailedTest(methodname, exception)
+ return suiteClass((test,)), message
+
+def _make_skipped_test(methodname, exception, suiteClass):
+ @case.skip(str(exception))
+ def testSkipped(self):
+ pass
+ attrs = {methodname: testSkipped}
+ TestClass = type("ModuleSkipped", (case.TestCase,), attrs)
+ return suiteClass((TestClass(methodname),))
+
+def _jython_aware_splitext(path):
+ if path.lower().endswith('$py.class'):
+ return path[:-9]
+ return os.path.splitext(path)[0]
+
+
+class TestLoader(object):
+ """
+ This class is responsible for loading tests according to various criteria
+ and returning them wrapped in a TestSuite
+ """
+ testMethodPrefix = 'test'
+ sortTestMethodsUsing = staticmethod(util.three_way_cmp)
+ testNamePatterns = None
+ suiteClass = suite.TestSuite
+ _top_level_dir = None
+
+ def __init__(self):
+ super(TestLoader, self).__init__()
+ self.errors = []
+ # Tracks packages which we have called into via load_tests, to
+ # avoid infinite re-entrancy.
+ self._loading_packages = set()
+
+ def loadTestsFromTestCase(self, testCaseClass):
+ """Return a suite of all test cases contained in testCaseClass"""
+ if issubclass(testCaseClass, suite.TestSuite):
+ raise TypeError("Test cases should not be derived from "
+ "TestSuite. Maybe you meant to derive from "
+ "TestCase?")
+ testCaseNames = self.getTestCaseNames(testCaseClass)
+ if not testCaseNames and hasattr(testCaseClass, 'runTest'):
+ testCaseNames = ['runTest']
+ loaded_suite = self.suiteClass(map(testCaseClass, testCaseNames))
+ return loaded_suite
+
+ # XXX After Python 3.5, remove backward compatibility hacks for
+ # use_load_tests deprecation via *args and **kws. See issue 16662.
+ def loadTestsFromModule(self, module, *args, pattern=None, **kws):
+ """Return a suite of all test cases contained in the given module"""
+ # This method used to take an undocumented and unofficial
+ # use_load_tests argument. For backward compatibility, we still
+ # accept the argument (which can also be the first position) but we
+ # ignore it and issue a deprecation warning if it's present.
+ if len(args) > 0 or 'use_load_tests' in kws:
+ warnings.warn('use_load_tests is deprecated and ignored',
+ DeprecationWarning)
+ kws.pop('use_load_tests', None)
+ if len(args) > 1:
+ # Complain about the number of arguments, but don't forget the
+ # required `module` argument.
+ complaint = len(args) + 1
+ raise TypeError('loadTestsFromModule() takes 1 positional argument but {} were given'.format(complaint))
+ if len(kws) != 0:
+ # Since the keyword arguments are unsorted (see PEP 468), just
+ # pick the alphabetically sorted first argument to complain about,
+ # if multiple were given. At least the error message will be
+ # predictable.
+ complaint = sorted(kws)[0]
+ raise TypeError("loadTestsFromModule() got an unexpected keyword argument '{}'".format(complaint))
+ tests = []
+ for name in dir(module):
+ obj = getattr(module, name)
+ if isinstance(obj, type) and issubclass(obj, case.TestCase):
+ tests.append(self.loadTestsFromTestCase(obj))
+
+ load_tests = getattr(module, 'load_tests', None)
+ tests = self.suiteClass(tests)
+ if load_tests is not None:
+ try:
+ return load_tests(self, tests, pattern)
+ except Exception as e:
+ error_case, error_message = _make_failed_load_tests(
+ module.__name__, e, self.suiteClass)
+ self.errors.append(error_message)
+ return error_case
+ return tests
+
+ def loadTestsFromName(self, name, module=None):
+ """Return a suite of all test cases given a string specifier.
+
+ The name may resolve either to a module, a test case class, a
+ test method within a test case class, or a callable object which
+ returns a TestCase or TestSuite instance.
+
+ The method optionally resolves the names relative to a given module.
+ """
+ parts = name.split('.')
+ error_case, error_message = None, None
+ if module is None:
+ parts_copy = parts[:]
+ while parts_copy:
+ try:
+ module_name = '.'.join(parts_copy)
+ module = __import__(module_name)
+ break
+ except ImportError:
+ next_attribute = parts_copy.pop()
+ # Last error so we can give it to the user if needed.
+ error_case, error_message = _make_failed_import_test(
+ next_attribute, self.suiteClass)
+ if not parts_copy:
+ # Even the top level import failed: report that error.
+ self.errors.append(error_message)
+ return error_case
+ parts = parts[1:]
+ obj = module
+ for part in parts:
+ try:
+ parent, obj = obj, getattr(obj, part)
+ except AttributeError as e:
+ # We can't traverse some part of the name.
+ if (getattr(obj, '__path__', None) is not None
+ and error_case is not None):
+ # This is a package (no __path__ per importlib docs), and we
+ # encountered an error importing something. We cannot tell
+ # the difference between package.WrongNameTestClass and
+ # package.wrong_module_name so we just report the
+ # ImportError - it is more informative.
+ self.errors.append(error_message)
+ return error_case
+ else:
+ # Otherwise, we signal that an AttributeError has occurred.
+ error_case, error_message = _make_failed_test(
+ part, e, self.suiteClass,
+ 'Failed to access attribute:\n%s' % (
+ traceback.format_exc(),))
+ self.errors.append(error_message)
+ return error_case
+
+ if isinstance(obj, types.ModuleType):
+ return self.loadTestsFromModule(obj)
+ elif isinstance(obj, type) and issubclass(obj, case.TestCase):
+ return self.loadTestsFromTestCase(obj)
+ elif (isinstance(obj, types.FunctionType) and
+ isinstance(parent, type) and
+ issubclass(parent, case.TestCase)):
+ name = parts[-1]
+ inst = parent(name)
+ # static methods follow a different path
+ if not isinstance(getattr(inst, name), types.FunctionType):
+ return self.suiteClass([inst])
+ elif isinstance(obj, suite.TestSuite):
+ return obj
+ if callable(obj):
+ test = obj()
+ if isinstance(test, suite.TestSuite):
+ return test
+ elif isinstance(test, case.TestCase):
+ return self.suiteClass([test])
+ else:
+ raise TypeError("calling %s returned %s, not a test" %
+ (obj, test))
+ else:
+ raise TypeError("don't know how to make test from: %s" % obj)
+
+ def loadTestsFromNames(self, names, module=None):
+ """Return a suite of all test cases found using the given sequence
+ of string specifiers. See 'loadTestsFromName()'.
+ """
+ suites = [self.loadTestsFromName(name, module) for name in names]
+ return self.suiteClass(suites)
+
+ def getTestCaseNames(self, testCaseClass):
+ """Return a sorted sequence of method names found within testCaseClass
+ """
+ def shouldIncludeMethod(attrname):
+ if not attrname.startswith(self.testMethodPrefix):
+ return False
+ testFunc = getattr(testCaseClass, attrname)
+ if not callable(testFunc):
+ return False
+ fullName = f'%s.%s.%s' % (
+ testCaseClass.__module__, testCaseClass.__qualname__, attrname
+ )
+ return self.testNamePatterns is None or \
+ any(fnmatchcase(fullName, pattern) for pattern in self.testNamePatterns)
+ testFnNames = list(filter(shouldIncludeMethod, dir(testCaseClass)))
+ if self.sortTestMethodsUsing:
+ testFnNames.sort(key=functools.cmp_to_key(self.sortTestMethodsUsing))
+ return testFnNames
+
+ def discover(self, start_dir, pattern='test*.py', top_level_dir=None):
+ """Find and return all test modules from the specified start
+ directory, recursing into subdirectories to find them and return all
+ tests found within them. Only test files that match the pattern will
+ be loaded. (Using shell style pattern matching.)
+
+ All test modules must be importable from the top level of the project.
+ If the start directory is not the top level directory then the top
+ level directory must be specified separately.
+
+ If a test package name (directory with '__init__.py') matches the
+ pattern then the package will be checked for a 'load_tests' function. If
+ this exists then it will be called with (loader, tests, pattern) unless
+ the package has already had load_tests called from the same discovery
+ invocation, in which case the package module object is not scanned for
+ tests - this ensures that when a package uses discover to further
+ discover child tests that infinite recursion does not happen.
+
+ If load_tests exists then discovery does *not* recurse into the package,
+ load_tests is responsible for loading all tests in the package.
+
+ The pattern is deliberately not stored as a loader attribute so that
+ packages can continue discovery themselves. top_level_dir is stored so
+ load_tests does not need to pass this argument in to loader.discover().
+
+ Paths are sorted before being imported to ensure reproducible execution
+ order even on filesystems with non-alphabetical ordering like ext3/4.
+ """
+ set_implicit_top = False
+ if top_level_dir is None and self._top_level_dir is not None:
+ # make top_level_dir optional if called from load_tests in a package
+ top_level_dir = self._top_level_dir
+ elif top_level_dir is None:
+ set_implicit_top = True
+ top_level_dir = start_dir
+
+ top_level_dir = os.path.abspath(top_level_dir)
+
+ if not top_level_dir in sys.path:
+ # all test modules must be importable from the top level directory
+ # should we *unconditionally* put the start directory in first
+ # in sys.path to minimise likelihood of conflicts between installed
+ # modules and development versions?
+ sys.path.insert(0, top_level_dir)
+ self._top_level_dir = top_level_dir
+
+ is_not_importable = False
+ is_namespace = False
+ tests = []
+ if os.path.isdir(os.path.abspath(start_dir)):
+ start_dir = os.path.abspath(start_dir)
+ if start_dir != top_level_dir:
+ is_not_importable = not os.path.isfile(os.path.join(start_dir, '__init__.py'))
+ else:
+ # support for discovery from dotted module names
+ try:
+ __import__(start_dir)
+ except ImportError:
+ is_not_importable = True
+ else:
+ the_module = sys.modules[start_dir]
+ top_part = start_dir.split('.')[0]
+ try:
+ start_dir = os.path.abspath(
+ os.path.dirname((the_module.__file__)))
+ except AttributeError:
+ # look for namespace packages
+ try:
+ spec = the_module.__spec__
+ except AttributeError:
+ spec = None
+
+ if spec and spec.loader is None:
+ if spec.submodule_search_locations is not None:
+ is_namespace = True
+
+ for path in the_module.__path__:
+ if (not set_implicit_top and
+ not path.startswith(top_level_dir)):
+ continue
+ self._top_level_dir = \
+ (path.split(the_module.__name__
+ .replace(".", os.path.sep))[0])
+ tests.extend(self._find_tests(path,
+ pattern,
+ namespace=True))
+ elif the_module.__name__ in sys.builtin_module_names:
+ # builtin module
+ raise TypeError('Can not use builtin modules '
+ 'as dotted module names') from None
+ else:
+ raise TypeError(
+ 'don\'t know how to discover from {!r}'
+ .format(the_module)) from None
+
+ if set_implicit_top:
+ if not is_namespace:
+ self._top_level_dir = \
+ self._get_directory_containing_module(top_part)
+ sys.path.remove(top_level_dir)
+ else:
+ sys.path.remove(top_level_dir)
+
+ if is_not_importable:
+ raise ImportError('Start directory is not importable: %r' % start_dir)
+
+ if not is_namespace:
+ tests = list(self._find_tests(start_dir, pattern))
+ return self.suiteClass(tests)
+
+ def _get_directory_containing_module(self, module_name):
+ module = sys.modules[module_name]
+ full_path = os.path.abspath(module.__file__)
+
+ if os.path.basename(full_path).lower().startswith('__init__.py'):
+ return os.path.dirname(os.path.dirname(full_path))
+ else:
+ # here we have been given a module rather than a package - so
+ # all we can do is search the *same* directory the module is in
+ # should an exception be raised instead
+ return os.path.dirname(full_path)
+
+ def _get_name_from_path(self, path):
+ if path == self._top_level_dir:
+ return '.'
+ path = _jython_aware_splitext(os.path.normpath(path))
+
+ _relpath = os.path.relpath(path, self._top_level_dir)
+ assert not os.path.isabs(_relpath), "Path must be within the project"
+ assert not _relpath.startswith('..'), "Path must be within the project"
+
+ name = _relpath.replace(os.path.sep, '.')
+ return name
+
+ def _get_module_from_name(self, name):
+ __import__(name)
+ return sys.modules[name]
+
+ def _match_path(self, path, full_path, pattern):
+ # override this method to use alternative matching strategy
+ return fnmatch(path, pattern)
+
+ def _find_tests(self, start_dir, pattern, namespace=False):
+ """Used by discovery. Yields test suites it loads."""
+ # Handle the __init__ in this package
+ name = self._get_name_from_path(start_dir)
+ # name is '.' when start_dir == top_level_dir (and top_level_dir is by
+ # definition not a package).
+ if name != '.' and name not in self._loading_packages:
+ # name is in self._loading_packages while we have called into
+ # loadTestsFromModule with name.
+ tests, should_recurse = self._find_test_path(
+ start_dir, pattern, namespace)
+ if tests is not None:
+ yield tests
+ if not should_recurse:
+ # Either an error occurred, or load_tests was used by the
+ # package.
+ return
+ # Handle the contents.
+ paths = sorted(os.listdir(start_dir))
+ for path in paths:
+ full_path = os.path.join(start_dir, path)
+ tests, should_recurse = self._find_test_path(
+ full_path, pattern, namespace)
+ if tests is not None:
+ yield tests
+ if should_recurse:
+ # we found a package that didn't use load_tests.
+ name = self._get_name_from_path(full_path)
+ self._loading_packages.add(name)
+ try:
+ yield from self._find_tests(full_path, pattern, namespace)
+ finally:
+ self._loading_packages.discard(name)
+
+ def _find_test_path(self, full_path, pattern, namespace=False):
+ """Used by discovery.
+
+ Loads tests from a single file, or a directories' __init__.py when
+ passed the directory.
+
+ Returns a tuple (None_or_tests_from_file, should_recurse).
+ """
+ basename = os.path.basename(full_path)
+ if os.path.isfile(full_path):
+ if not VALID_MODULE_NAME.match(basename):
+ # valid Python identifiers only
+ return None, False
+ if not self._match_path(basename, full_path, pattern):
+ return None, False
+ # if the test file matches, load it
+ name = self._get_name_from_path(full_path)
+ try:
+ module = self._get_module_from_name(name)
+ except case.SkipTest as e:
+ return _make_skipped_test(name, e, self.suiteClass), False
+ except:
+ error_case, error_message = \
+ _make_failed_import_test(name, self.suiteClass)
+ self.errors.append(error_message)
+ return error_case, False
+ else:
+ mod_file = os.path.abspath(
+ getattr(module, '__file__', full_path))
+ realpath = _jython_aware_splitext(
+ os.path.realpath(mod_file))
+ fullpath_noext = _jython_aware_splitext(
+ os.path.realpath(full_path))
+ if realpath.lower() != fullpath_noext.lower():
+ module_dir = os.path.dirname(realpath)
+ mod_name = _jython_aware_splitext(
+ os.path.basename(full_path))
+ expected_dir = os.path.dirname(full_path)
+ msg = ("%r module incorrectly imported from %r. Expected "
+ "%r. Is this module globally installed?")
+ raise ImportError(
+ msg % (mod_name, module_dir, expected_dir))
+ return self.loadTestsFromModule(module, pattern=pattern), False
+ elif os.path.isdir(full_path):
+ if (not namespace and
+ not os.path.isfile(os.path.join(full_path, '__init__.py'))):
+ return None, False
+
+ load_tests = None
+ tests = None
+ name = self._get_name_from_path(full_path)
+ try:
+ package = self._get_module_from_name(name)
+ except case.SkipTest as e:
+ return _make_skipped_test(name, e, self.suiteClass), False
+ except:
+ error_case, error_message = \
+ _make_failed_import_test(name, self.suiteClass)
+ self.errors.append(error_message)
+ return error_case, False
+ else:
+ load_tests = getattr(package, 'load_tests', None)
+ # Mark this package as being in load_tests (possibly ;))
+ self._loading_packages.add(name)
+ try:
+ tests = self.loadTestsFromModule(package, pattern=pattern)
+ if load_tests is not None:
+ # loadTestsFromModule(package) has loaded tests for us.
+ return tests, False
+ return tests, True
+ finally:
+ self._loading_packages.discard(name)
+ else:
+ return None, False
+
+
+defaultTestLoader = TestLoader()
+
+
+def _makeLoader(prefix, sortUsing, suiteClass=None, testNamePatterns=None):
+ loader = TestLoader()
+ loader.sortTestMethodsUsing = sortUsing
+ loader.testMethodPrefix = prefix
+ loader.testNamePatterns = testNamePatterns
+ if suiteClass:
+ loader.suiteClass = suiteClass
+ return loader
+
+def getTestCaseNames(testCaseClass, prefix, sortUsing=util.three_way_cmp, testNamePatterns=None):
+ return _makeLoader(prefix, sortUsing, testNamePatterns=testNamePatterns).getTestCaseNames(testCaseClass)
+
+def makeSuite(testCaseClass, prefix='test', sortUsing=util.three_way_cmp,
+ suiteClass=suite.TestSuite):
+ return _makeLoader(prefix, sortUsing, suiteClass).loadTestsFromTestCase(
+ testCaseClass)
+
+def findTestCases(module, prefix='test', sortUsing=util.three_way_cmp,
+ suiteClass=suite.TestSuite):
+ return _makeLoader(prefix, sortUsing, suiteClass).loadTestsFromModule(\
+ module)
diff --git a/lib/python3.10/unittest/main.py b/lib/python3.10/unittest/main.py
new file mode 100644
index 0000000..e62469a
--- /dev/null
+++ b/lib/python3.10/unittest/main.py
@@ -0,0 +1,275 @@
+"""Unittest main program"""
+
+import sys
+import argparse
+import os
+
+from . import loader, runner
+from .signals import installHandler
+
+__unittest = True
+
+MAIN_EXAMPLES = """\
+Examples:
+ %(prog)s test_module - run tests from test_module
+ %(prog)s module.TestClass - run tests from module.TestClass
+ %(prog)s module.Class.test_method - run specified test method
+ %(prog)s path/to/test_file.py - run tests from test_file.py
+"""
+
+MODULE_EXAMPLES = """\
+Examples:
+ %(prog)s - run default set of tests
+ %(prog)s MyTestSuite - run suite 'MyTestSuite'
+ %(prog)s MyTestCase.testSomething - run MyTestCase.testSomething
+ %(prog)s MyTestCase - run all 'test*' test methods
+ in MyTestCase
+"""
+
+def _convert_name(name):
+ # on Linux / Mac OS X 'foo.PY' is not importable, but on
+ # Windows it is. Simpler to do a case insensitive match
+ # a better check would be to check that the name is a
+ # valid Python module name.
+ if os.path.isfile(name) and name.lower().endswith('.py'):
+ if os.path.isabs(name):
+ rel_path = os.path.relpath(name, os.getcwd())
+ if os.path.isabs(rel_path) or rel_path.startswith(os.pardir):
+ return name
+ name = rel_path
+ # on Windows both '\' and '/' are used as path
+ # separators. Better to replace both than rely on os.path.sep
+ return name[:-3].replace('\\', '.').replace('/', '.')
+ return name
+
+def _convert_names(names):
+ return [_convert_name(name) for name in names]
+
+
+def _convert_select_pattern(pattern):
+ if not '*' in pattern:
+ pattern = '*%s*' % pattern
+ return pattern
+
+
+class TestProgram(object):
+ """A command-line program that runs a set of tests; this is primarily
+ for making test modules conveniently executable.
+ """
+ # defaults for testing
+ module=None
+ verbosity = 1
+ failfast = catchbreak = buffer = progName = warnings = testNamePatterns = None
+ _discovery_parser = None
+
+ def __init__(self, module='__main__', defaultTest=None, argv=None,
+ testRunner=None, testLoader=loader.defaultTestLoader,
+ exit=True, verbosity=1, failfast=None, catchbreak=None,
+ buffer=None, warnings=None, *, tb_locals=False):
+ if isinstance(module, str):
+ self.module = __import__(module)
+ for part in module.split('.')[1:]:
+ self.module = getattr(self.module, part)
+ else:
+ self.module = module
+ if argv is None:
+ argv = sys.argv
+
+ self.exit = exit
+ self.failfast = failfast
+ self.catchbreak = catchbreak
+ self.verbosity = verbosity
+ self.buffer = buffer
+ self.tb_locals = tb_locals
+ if warnings is None and not sys.warnoptions:
+ # even if DeprecationWarnings are ignored by default
+ # print them anyway unless other warnings settings are
+ # specified by the warnings arg or the -W python flag
+ self.warnings = 'default'
+ else:
+ # here self.warnings is set either to the value passed
+ # to the warnings args or to None.
+ # If the user didn't pass a value self.warnings will
+ # be None. This means that the behavior is unchanged
+ # and depends on the values passed to -W.
+ self.warnings = warnings
+ self.defaultTest = defaultTest
+ self.testRunner = testRunner
+ self.testLoader = testLoader
+ self.progName = os.path.basename(argv[0])
+ self.parseArgs(argv)
+ self.runTests()
+
+ def usageExit(self, msg=None):
+ if msg:
+ print(msg)
+ if self._discovery_parser is None:
+ self._initArgParsers()
+ self._print_help()
+ sys.exit(2)
+
+ def _print_help(self, *args, **kwargs):
+ if self.module is None:
+ print(self._main_parser.format_help())
+ print(MAIN_EXAMPLES % {'prog': self.progName})
+ self._discovery_parser.print_help()
+ else:
+ print(self._main_parser.format_help())
+ print(MODULE_EXAMPLES % {'prog': self.progName})
+
+ def parseArgs(self, argv):
+ self._initArgParsers()
+ if self.module is None:
+ if len(argv) > 1 and argv[1].lower() == 'discover':
+ self._do_discovery(argv[2:])
+ return
+ self._main_parser.parse_args(argv[1:], self)
+ if not self.tests:
+ # this allows "python -m unittest -v" to still work for
+ # test discovery.
+ self._do_discovery([])
+ return
+ else:
+ self._main_parser.parse_args(argv[1:], self)
+
+ if self.tests:
+ self.testNames = _convert_names(self.tests)
+ if __name__ == '__main__':
+ # to support python -m unittest ...
+ self.module = None
+ elif self.defaultTest is None:
+ # createTests will load tests from self.module
+ self.testNames = None
+ elif isinstance(self.defaultTest, str):
+ self.testNames = (self.defaultTest,)
+ else:
+ self.testNames = list(self.defaultTest)
+ self.createTests()
+
+ def createTests(self, from_discovery=False, Loader=None):
+ if self.testNamePatterns:
+ self.testLoader.testNamePatterns = self.testNamePatterns
+ if from_discovery:
+ loader = self.testLoader if Loader is None else Loader()
+ self.test = loader.discover(self.start, self.pattern, self.top)
+ elif self.testNames is None:
+ self.test = self.testLoader.loadTestsFromModule(self.module)
+ else:
+ self.test = self.testLoader.loadTestsFromNames(self.testNames,
+ self.module)
+
+ def _initArgParsers(self):
+ parent_parser = self._getParentArgParser()
+ self._main_parser = self._getMainArgParser(parent_parser)
+ self._discovery_parser = self._getDiscoveryArgParser(parent_parser)
+
+ def _getParentArgParser(self):
+ parser = argparse.ArgumentParser(add_help=False)
+
+ parser.add_argument('-v', '--verbose', dest='verbosity',
+ action='store_const', const=2,
+ help='Verbose output')
+ parser.add_argument('-q', '--quiet', dest='verbosity',
+ action='store_const', const=0,
+ help='Quiet output')
+ parser.add_argument('--locals', dest='tb_locals',
+ action='store_true',
+ help='Show local variables in tracebacks')
+ if self.failfast is None:
+ parser.add_argument('-f', '--failfast', dest='failfast',
+ action='store_true',
+ help='Stop on first fail or error')
+ self.failfast = False
+ if self.catchbreak is None:
+ parser.add_argument('-c', '--catch', dest='catchbreak',
+ action='store_true',
+ help='Catch Ctrl-C and display results so far')
+ self.catchbreak = False
+ if self.buffer is None:
+ parser.add_argument('-b', '--buffer', dest='buffer',
+ action='store_true',
+ help='Buffer stdout and stderr during tests')
+ self.buffer = False
+ if self.testNamePatterns is None:
+ parser.add_argument('-k', dest='testNamePatterns',
+ action='append', type=_convert_select_pattern,
+ help='Only run tests which match the given substring')
+ self.testNamePatterns = []
+
+ return parser
+
+ def _getMainArgParser(self, parent):
+ parser = argparse.ArgumentParser(parents=[parent])
+ parser.prog = self.progName
+ parser.print_help = self._print_help
+
+ parser.add_argument('tests', nargs='*',
+ help='a list of any number of test modules, '
+ 'classes and test methods.')
+
+ return parser
+
+ def _getDiscoveryArgParser(self, parent):
+ parser = argparse.ArgumentParser(parents=[parent])
+ parser.prog = '%s discover' % self.progName
+ parser.epilog = ('For test discovery all test modules must be '
+ 'importable from the top level directory of the '
+ 'project.')
+
+ parser.add_argument('-s', '--start-directory', dest='start',
+ help="Directory to start discovery ('.' default)")
+ parser.add_argument('-p', '--pattern', dest='pattern',
+ help="Pattern to match tests ('test*.py' default)")
+ parser.add_argument('-t', '--top-level-directory', dest='top',
+ help='Top level directory of project (defaults to '
+ 'start directory)')
+ for arg in ('start', 'pattern', 'top'):
+ parser.add_argument(arg, nargs='?',
+ default=argparse.SUPPRESS,
+ help=argparse.SUPPRESS)
+
+ return parser
+
+ def _do_discovery(self, argv, Loader=None):
+ self.start = '.'
+ self.pattern = 'test*.py'
+ self.top = None
+ if argv is not None:
+ # handle command line args for test discovery
+ if self._discovery_parser is None:
+ # for testing
+ self._initArgParsers()
+ self._discovery_parser.parse_args(argv, self)
+
+ self.createTests(from_discovery=True, Loader=Loader)
+
+ def runTests(self):
+ if self.catchbreak:
+ installHandler()
+ if self.testRunner is None:
+ self.testRunner = runner.TextTestRunner
+ if isinstance(self.testRunner, type):
+ try:
+ try:
+ testRunner = self.testRunner(verbosity=self.verbosity,
+ failfast=self.failfast,
+ buffer=self.buffer,
+ warnings=self.warnings,
+ tb_locals=self.tb_locals)
+ except TypeError:
+ # didn't accept the tb_locals argument
+ testRunner = self.testRunner(verbosity=self.verbosity,
+ failfast=self.failfast,
+ buffer=self.buffer,
+ warnings=self.warnings)
+ except TypeError:
+ # didn't accept the verbosity, buffer or failfast arguments
+ testRunner = self.testRunner()
+ else:
+ # it is assumed to be a TestRunner instance
+ testRunner = self.testRunner
+ self.result = testRunner.run(self.test)
+ if self.exit:
+ sys.exit(not self.result.wasSuccessful())
+
+main = TestProgram
diff --git a/lib/python3.10/unittest/mock.py b/lib/python3.10/unittest/mock.py
new file mode 100644
index 0000000..7152f86
--- /dev/null
+++ b/lib/python3.10/unittest/mock.py
@@ -0,0 +1,2950 @@
+# mock.py
+# Test tools for mocking and patching.
+# Maintained by Michael Foord
+# Backport for other versions of Python available from
+# https://pypi.org/project/mock
+
+__all__ = (
+ 'Mock',
+ 'MagicMock',
+ 'patch',
+ 'sentinel',
+ 'DEFAULT',
+ 'ANY',
+ 'call',
+ 'create_autospec',
+ 'AsyncMock',
+ 'FILTER_DIR',
+ 'NonCallableMock',
+ 'NonCallableMagicMock',
+ 'mock_open',
+ 'PropertyMock',
+ 'seal',
+)
+
+
+import asyncio
+import contextlib
+import io
+import inspect
+import pprint
+import sys
+import builtins
+from asyncio import iscoroutinefunction
+from types import CodeType, ModuleType, MethodType
+from unittest.util import safe_repr
+from functools import wraps, partial
+
+
+class InvalidSpecError(Exception):
+ """Indicates that an invalid value was used as a mock spec."""
+
+
+_builtins = {name for name in dir(builtins) if not name.startswith('_')}
+
+FILTER_DIR = True
+
+# Workaround for issue #12370
+# Without this, the __class__ properties wouldn't be set correctly
+_safe_super = super
+
+def _is_async_obj(obj):
+ if _is_instance_mock(obj) and not isinstance(obj, AsyncMock):
+ return False
+ if hasattr(obj, '__func__'):
+ obj = getattr(obj, '__func__')
+ return iscoroutinefunction(obj) or inspect.isawaitable(obj)
+
+
+def _is_async_func(func):
+ if getattr(func, '__code__', None):
+ return iscoroutinefunction(func)
+ else:
+ return False
+
+
+def _is_instance_mock(obj):
+ # can't use isinstance on Mock objects because they override __class__
+ # The base class for all mocks is NonCallableMock
+ return issubclass(type(obj), NonCallableMock)
+
+
+def _is_exception(obj):
+ return (
+ isinstance(obj, BaseException) or
+ isinstance(obj, type) and issubclass(obj, BaseException)
+ )
+
+
+def _extract_mock(obj):
+ # Autospecced functions will return a FunctionType with "mock" attribute
+ # which is the actual mock object that needs to be used.
+ if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'):
+ return obj.mock
+ else:
+ return obj
+
+
+def _get_signature_object(func, as_instance, eat_self):
+ """
+ Given an arbitrary, possibly callable object, try to create a suitable
+ signature object.
+ Return a (reduced func, signature) tuple, or None.
+ """
+ if isinstance(func, type) and not as_instance:
+ # If it's a type and should be modelled as a type, use __init__.
+ func = func.__init__
+ # Skip the `self` argument in __init__
+ eat_self = True
+ elif not isinstance(func, FunctionTypes):
+ # If we really want to model an instance of the passed type,
+ # __call__ should be looked up, not __init__.
+ try:
+ func = func.__call__
+ except AttributeError:
+ return None
+ if eat_self:
+ sig_func = partial(func, None)
+ else:
+ sig_func = func
+ try:
+ return func, inspect.signature(sig_func)
+ except ValueError:
+ # Certain callable types are not supported by inspect.signature()
+ return None
+
+
+def _check_signature(func, mock, skipfirst, instance=False):
+ sig = _get_signature_object(func, instance, skipfirst)
+ if sig is None:
+ return
+ func, sig = sig
+ def checksig(self, /, *args, **kwargs):
+ sig.bind(*args, **kwargs)
+ _copy_func_details(func, checksig)
+ type(mock)._mock_check_sig = checksig
+ type(mock).__signature__ = sig
+
+
+def _copy_func_details(func, funcopy):
+ # we explicitly don't copy func.__dict__ into this copy as it would
+ # expose original attributes that should be mocked
+ for attribute in (
+ '__name__', '__doc__', '__text_signature__',
+ '__module__', '__defaults__', '__kwdefaults__',
+ ):
+ try:
+ setattr(funcopy, attribute, getattr(func, attribute))
+ except AttributeError:
+ pass
+
+
+def _callable(obj):
+ if isinstance(obj, type):
+ return True
+ if isinstance(obj, (staticmethod, classmethod, MethodType)):
+ return _callable(obj.__func__)
+ if getattr(obj, '__call__', None) is not None:
+ return True
+ return False
+
+
+def _is_list(obj):
+ # checks for list or tuples
+ # XXXX badly named!
+ return type(obj) in (list, tuple)
+
+
+def _instance_callable(obj):
+ """Given an object, return True if the object is callable.
+ For classes, return True if instances would be callable."""
+ if not isinstance(obj, type):
+ # already an instance
+ return getattr(obj, '__call__', None) is not None
+
+ # *could* be broken by a class overriding __mro__ or __dict__ via
+ # a metaclass
+ for base in (obj,) + obj.__mro__:
+ if base.__dict__.get('__call__') is not None:
+ return True
+ return False
+
+
+def _set_signature(mock, original, instance=False):
+ # creates a function with signature (*args, **kwargs) that delegates to a
+ # mock. It still does signature checking by calling a lambda with the same
+ # signature as the original.
+
+ skipfirst = isinstance(original, type)
+ result = _get_signature_object(original, instance, skipfirst)
+ if result is None:
+ return mock
+ func, sig = result
+ def checksig(*args, **kwargs):
+ sig.bind(*args, **kwargs)
+ _copy_func_details(func, checksig)
+
+ name = original.__name__
+ if not name.isidentifier():
+ name = 'funcopy'
+ context = {'_checksig_': checksig, 'mock': mock}
+ src = """def %s(*args, **kwargs):
+ _checksig_(*args, **kwargs)
+ return mock(*args, **kwargs)""" % name
+ exec (src, context)
+ funcopy = context[name]
+ _setup_func(funcopy, mock, sig)
+ return funcopy
+
+
+def _setup_func(funcopy, mock, sig):
+ funcopy.mock = mock
+
+ def assert_called_with(*args, **kwargs):
+ return mock.assert_called_with(*args, **kwargs)
+ def assert_called(*args, **kwargs):
+ return mock.assert_called(*args, **kwargs)
+ def assert_not_called(*args, **kwargs):
+ return mock.assert_not_called(*args, **kwargs)
+ def assert_called_once(*args, **kwargs):
+ return mock.assert_called_once(*args, **kwargs)
+ def assert_called_once_with(*args, **kwargs):
+ return mock.assert_called_once_with(*args, **kwargs)
+ def assert_has_calls(*args, **kwargs):
+ return mock.assert_has_calls(*args, **kwargs)
+ def assert_any_call(*args, **kwargs):
+ return mock.assert_any_call(*args, **kwargs)
+ def reset_mock():
+ funcopy.method_calls = _CallList()
+ funcopy.mock_calls = _CallList()
+ mock.reset_mock()
+ ret = funcopy.return_value
+ if _is_instance_mock(ret) and not ret is mock:
+ ret.reset_mock()
+
+ funcopy.called = False
+ funcopy.call_count = 0
+ funcopy.call_args = None
+ funcopy.call_args_list = _CallList()
+ funcopy.method_calls = _CallList()
+ funcopy.mock_calls = _CallList()
+
+ funcopy.return_value = mock.return_value
+ funcopy.side_effect = mock.side_effect
+ funcopy._mock_children = mock._mock_children
+
+ funcopy.assert_called_with = assert_called_with
+ funcopy.assert_called_once_with = assert_called_once_with
+ funcopy.assert_has_calls = assert_has_calls
+ funcopy.assert_any_call = assert_any_call
+ funcopy.reset_mock = reset_mock
+ funcopy.assert_called = assert_called
+ funcopy.assert_not_called = assert_not_called
+ funcopy.assert_called_once = assert_called_once
+ funcopy.__signature__ = sig
+
+ mock._mock_delegate = funcopy
+
+
+def _setup_async_mock(mock):
+ mock._is_coroutine = asyncio.coroutines._is_coroutine
+ mock.await_count = 0
+ mock.await_args = None
+ mock.await_args_list = _CallList()
+
+ # Mock is not configured yet so the attributes are set
+ # to a function and then the corresponding mock helper function
+ # is called when the helper is accessed similar to _setup_func.
+ def wrapper(attr, /, *args, **kwargs):
+ return getattr(mock.mock, attr)(*args, **kwargs)
+
+ for attribute in ('assert_awaited',
+ 'assert_awaited_once',
+ 'assert_awaited_with',
+ 'assert_awaited_once_with',
+ 'assert_any_await',
+ 'assert_has_awaits',
+ 'assert_not_awaited'):
+
+ # setattr(mock, attribute, wrapper) causes late binding
+ # hence attribute will always be the last value in the loop
+ # Use partial(wrapper, attribute) to ensure the attribute is bound
+ # correctly.
+ setattr(mock, attribute, partial(wrapper, attribute))
+
+
+def _is_magic(name):
+ return '__%s__' % name[2:-2] == name
+
+
+class _SentinelObject(object):
+ "A unique, named, sentinel object."
+ def __init__(self, name):
+ self.name = name
+
+ def __repr__(self):
+ return 'sentinel.%s' % self.name
+
+ def __reduce__(self):
+ return 'sentinel.%s' % self.name
+
+
+class _Sentinel(object):
+ """Access attributes to return a named object, usable as a sentinel."""
+ def __init__(self):
+ self._sentinels = {}
+
+ def __getattr__(self, name):
+ if name == '__bases__':
+ # Without this help(unittest.mock) raises an exception
+ raise AttributeError
+ return self._sentinels.setdefault(name, _SentinelObject(name))
+
+ def __reduce__(self):
+ return 'sentinel'
+
+
+sentinel = _Sentinel()
+
+DEFAULT = sentinel.DEFAULT
+_missing = sentinel.MISSING
+_deleted = sentinel.DELETED
+
+
+_allowed_names = {
+ 'return_value', '_mock_return_value', 'side_effect',
+ '_mock_side_effect', '_mock_parent', '_mock_new_parent',
+ '_mock_name', '_mock_new_name'
+}
+
+
+def _delegating_property(name):
+ _allowed_names.add(name)
+ _the_name = '_mock_' + name
+ def _get(self, name=name, _the_name=_the_name):
+ sig = self._mock_delegate
+ if sig is None:
+ return getattr(self, _the_name)
+ return getattr(sig, name)
+ def _set(self, value, name=name, _the_name=_the_name):
+ sig = self._mock_delegate
+ if sig is None:
+ self.__dict__[_the_name] = value
+ else:
+ setattr(sig, name, value)
+
+ return property(_get, _set)
+
+
+
+class _CallList(list):
+
+ def __contains__(self, value):
+ if not isinstance(value, list):
+ return list.__contains__(self, value)
+ len_value = len(value)
+ len_self = len(self)
+ if len_value > len_self:
+ return False
+
+ for i in range(0, len_self - len_value + 1):
+ sub_list = self[i:i+len_value]
+ if sub_list == value:
+ return True
+ return False
+
+ def __repr__(self):
+ return pprint.pformat(list(self))
+
+
+def _check_and_set_parent(parent, value, name, new_name):
+ value = _extract_mock(value)
+
+ if not _is_instance_mock(value):
+ return False
+ if ((value._mock_name or value._mock_new_name) or
+ (value._mock_parent is not None) or
+ (value._mock_new_parent is not None)):
+ return False
+
+ _parent = parent
+ while _parent is not None:
+ # setting a mock (value) as a child or return value of itself
+ # should not modify the mock
+ if _parent is value:
+ return False
+ _parent = _parent._mock_new_parent
+
+ if new_name:
+ value._mock_new_parent = parent
+ value._mock_new_name = new_name
+ if name:
+ value._mock_parent = parent
+ value._mock_name = name
+ return True
+
+# Internal class to identify if we wrapped an iterator object or not.
+class _MockIter(object):
+ def __init__(self, obj):
+ self.obj = iter(obj)
+ def __next__(self):
+ return next(self.obj)
+
+class Base(object):
+ _mock_return_value = DEFAULT
+ _mock_side_effect = None
+ def __init__(self, /, *args, **kwargs):
+ pass
+
+
+
+class NonCallableMock(Base):
+ """A non-callable version of `Mock`"""
+
+ def __new__(cls, /, *args, **kw):
+ # every instance has its own class
+ # so we can create magic methods on the
+ # class without stomping on other mocks
+ bases = (cls,)
+ if not issubclass(cls, AsyncMockMixin):
+ # Check if spec is an async object or function
+ bound_args = _MOCK_SIG.bind_partial(cls, *args, **kw).arguments
+ spec_arg = bound_args.get('spec_set', bound_args.get('spec'))
+ if spec_arg is not None and _is_async_obj(spec_arg):
+ bases = (AsyncMockMixin, cls)
+ new = type(cls.__name__, bases, {'__doc__': cls.__doc__})
+ instance = _safe_super(NonCallableMock, cls).__new__(new)
+ return instance
+
+
+ def __init__(
+ self, spec=None, wraps=None, name=None, spec_set=None,
+ parent=None, _spec_state=None, _new_name='', _new_parent=None,
+ _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
+ ):
+ if _new_parent is None:
+ _new_parent = parent
+
+ __dict__ = self.__dict__
+ __dict__['_mock_parent'] = parent
+ __dict__['_mock_name'] = name
+ __dict__['_mock_new_name'] = _new_name
+ __dict__['_mock_new_parent'] = _new_parent
+ __dict__['_mock_sealed'] = False
+
+ if spec_set is not None:
+ spec = spec_set
+ spec_set = True
+ if _eat_self is None:
+ _eat_self = parent is not None
+
+ self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self)
+
+ __dict__['_mock_children'] = {}
+ __dict__['_mock_wraps'] = wraps
+ __dict__['_mock_delegate'] = None
+
+ __dict__['_mock_called'] = False
+ __dict__['_mock_call_args'] = None
+ __dict__['_mock_call_count'] = 0
+ __dict__['_mock_call_args_list'] = _CallList()
+ __dict__['_mock_mock_calls'] = _CallList()
+
+ __dict__['method_calls'] = _CallList()
+ __dict__['_mock_unsafe'] = unsafe
+
+ if kwargs:
+ self.configure_mock(**kwargs)
+
+ _safe_super(NonCallableMock, self).__init__(
+ spec, wraps, name, spec_set, parent,
+ _spec_state
+ )
+
+
+ def attach_mock(self, mock, attribute):
+ """
+ Attach a mock as an attribute of this one, replacing its name and
+ parent. Calls to the attached mock will be recorded in the
+ `method_calls` and `mock_calls` attributes of this one."""
+ inner_mock = _extract_mock(mock)
+
+ inner_mock._mock_parent = None
+ inner_mock._mock_new_parent = None
+ inner_mock._mock_name = ''
+ inner_mock._mock_new_name = None
+
+ setattr(self, attribute, mock)
+
+
+ def mock_add_spec(self, spec, spec_set=False):
+ """Add a spec to a mock. `spec` can either be an object or a
+ list of strings. Only attributes on the `spec` can be fetched as
+ attributes from the mock.
+
+ If `spec_set` is True then only attributes on the spec can be set."""
+ self._mock_add_spec(spec, spec_set)
+
+
+ def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False,
+ _eat_self=False):
+ _spec_class = None
+ _spec_signature = None
+ _spec_asyncs = []
+
+ for attr in dir(spec):
+ if iscoroutinefunction(getattr(spec, attr, None)):
+ _spec_asyncs.append(attr)
+
+ if spec is not None and not _is_list(spec):
+ if isinstance(spec, type):
+ _spec_class = spec
+ else:
+ _spec_class = type(spec)
+ res = _get_signature_object(spec,
+ _spec_as_instance, _eat_self)
+ _spec_signature = res and res[1]
+
+ spec = dir(spec)
+
+ __dict__ = self.__dict__
+ __dict__['_spec_class'] = _spec_class
+ __dict__['_spec_set'] = spec_set
+ __dict__['_spec_signature'] = _spec_signature
+ __dict__['_mock_methods'] = spec
+ __dict__['_spec_asyncs'] = _spec_asyncs
+
+ def __get_return_value(self):
+ ret = self._mock_return_value
+ if self._mock_delegate is not None:
+ ret = self._mock_delegate.return_value
+
+ if ret is DEFAULT:
+ ret = self._get_child_mock(
+ _new_parent=self, _new_name='()'
+ )
+ self.return_value = ret
+ return ret
+
+
+ def __set_return_value(self, value):
+ if self._mock_delegate is not None:
+ self._mock_delegate.return_value = value
+ else:
+ self._mock_return_value = value
+ _check_and_set_parent(self, value, None, '()')
+
+ __return_value_doc = "The value to be returned when the mock is called."
+ return_value = property(__get_return_value, __set_return_value,
+ __return_value_doc)
+
+
+ @property
+ def __class__(self):
+ if self._spec_class is None:
+ return type(self)
+ return self._spec_class
+
+ called = _delegating_property('called')
+ call_count = _delegating_property('call_count')
+ call_args = _delegating_property('call_args')
+ call_args_list = _delegating_property('call_args_list')
+ mock_calls = _delegating_property('mock_calls')
+
+
+ def __get_side_effect(self):
+ delegated = self._mock_delegate
+ if delegated is None:
+ return self._mock_side_effect
+ sf = delegated.side_effect
+ if (sf is not None and not callable(sf)
+ and not isinstance(sf, _MockIter) and not _is_exception(sf)):
+ sf = _MockIter(sf)
+ delegated.side_effect = sf
+ return sf
+
+ def __set_side_effect(self, value):
+ value = _try_iter(value)
+ delegated = self._mock_delegate
+ if delegated is None:
+ self._mock_side_effect = value
+ else:
+ delegated.side_effect = value
+
+ side_effect = property(__get_side_effect, __set_side_effect)
+
+
+ def reset_mock(self, visited=None,*, return_value=False, side_effect=False):
+ "Restore the mock object to its initial state."
+ if visited is None:
+ visited = []
+ if id(self) in visited:
+ return
+ visited.append(id(self))
+
+ self.called = False
+ self.call_args = None
+ self.call_count = 0
+ self.mock_calls = _CallList()
+ self.call_args_list = _CallList()
+ self.method_calls = _CallList()
+
+ if return_value:
+ self._mock_return_value = DEFAULT
+ if side_effect:
+ self._mock_side_effect = None
+
+ for child in self._mock_children.values():
+ if isinstance(child, _SpecState) or child is _deleted:
+ continue
+ child.reset_mock(visited, return_value=return_value, side_effect=side_effect)
+
+ ret = self._mock_return_value
+ if _is_instance_mock(ret) and ret is not self:
+ ret.reset_mock(visited)
+
+
+ def configure_mock(self, /, **kwargs):
+ """Set attributes on the mock through keyword arguments.
+
+ Attributes plus return values and side effects can be set on child
+ mocks using standard dot notation and unpacking a dictionary in the
+ method call:
+
+ >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError}
+ >>> mock.configure_mock(**attrs)"""
+ for arg, val in sorted(kwargs.items(),
+ # we sort on the number of dots so that
+ # attributes are set before we set attributes on
+ # attributes
+ key=lambda entry: entry[0].count('.')):
+ args = arg.split('.')
+ final = args.pop()
+ obj = self
+ for entry in args:
+ obj = getattr(obj, entry)
+ setattr(obj, final, val)
+
+
+ def __getattr__(self, name):
+ if name in {'_mock_methods', '_mock_unsafe'}:
+ raise AttributeError(name)
+ elif self._mock_methods is not None:
+ if name not in self._mock_methods or name in _all_magics:
+ raise AttributeError("Mock object has no attribute %r" % name)
+ elif _is_magic(name):
+ raise AttributeError(name)
+ if not self._mock_unsafe:
+ if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')):
+ raise AttributeError(
+ f"{name!r} is not a valid assertion. Use a spec "
+ f"for the mock if {name!r} is meant to be an attribute.")
+
+ result = self._mock_children.get(name)
+ if result is _deleted:
+ raise AttributeError(name)
+ elif result is None:
+ wraps = None
+ if self._mock_wraps is not None:
+ # XXXX should we get the attribute without triggering code
+ # execution?
+ wraps = getattr(self._mock_wraps, name)
+
+ result = self._get_child_mock(
+ parent=self, name=name, wraps=wraps, _new_name=name,
+ _new_parent=self
+ )
+ self._mock_children[name] = result
+
+ elif isinstance(result, _SpecState):
+ try:
+ result = create_autospec(
+ result.spec, result.spec_set, result.instance,
+ result.parent, result.name
+ )
+ except InvalidSpecError:
+ target_name = self.__dict__['_mock_name'] or self
+ raise InvalidSpecError(
+ f'Cannot autospec attr {name!r} from target '
+ f'{target_name!r} as it has already been mocked out. '
+ f'[target={self!r}, attr={result.spec!r}]')
+ self._mock_children[name] = result
+
+ return result
+
+
+ def _extract_mock_name(self):
+ _name_list = [self._mock_new_name]
+ _parent = self._mock_new_parent
+ last = self
+
+ dot = '.'
+ if _name_list == ['()']:
+ dot = ''
+
+ while _parent is not None:
+ last = _parent
+
+ _name_list.append(_parent._mock_new_name + dot)
+ dot = '.'
+ if _parent._mock_new_name == '()':
+ dot = ''
+
+ _parent = _parent._mock_new_parent
+
+ _name_list = list(reversed(_name_list))
+ _first = last._mock_name or 'mock'
+ if len(_name_list) > 1:
+ if _name_list[1] not in ('()', '().'):
+ _first += '.'
+ _name_list[0] = _first
+ return ''.join(_name_list)
+
+ def __repr__(self):
+ name = self._extract_mock_name()
+
+ name_string = ''
+ if name not in ('mock', 'mock.'):
+ name_string = ' name=%r' % name
+
+ spec_string = ''
+ if self._spec_class is not None:
+ spec_string = ' spec=%r'
+ if self._spec_set:
+ spec_string = ' spec_set=%r'
+ spec_string = spec_string % self._spec_class.__name__
+ return "<%s%s%s id='%s'>" % (
+ type(self).__name__,
+ name_string,
+ spec_string,
+ id(self)
+ )
+
+
+ def __dir__(self):
+ """Filter the output of `dir(mock)` to only useful members."""
+ if not FILTER_DIR:
+ return object.__dir__(self)
+
+ extras = self._mock_methods or []
+ from_type = dir(type(self))
+ from_dict = list(self.__dict__)
+ from_child_mocks = [
+ m_name for m_name, m_value in self._mock_children.items()
+ if m_value is not _deleted]
+
+ from_type = [e for e in from_type if not e.startswith('_')]
+ from_dict = [e for e in from_dict if not e.startswith('_') or
+ _is_magic(e)]
+ return sorted(set(extras + from_type + from_dict + from_child_mocks))
+
+
+ def __setattr__(self, name, value):
+ if name in _allowed_names:
+ # property setters go through here
+ return object.__setattr__(self, name, value)
+ elif (self._spec_set and self._mock_methods is not None and
+ name not in self._mock_methods and
+ name not in self.__dict__):
+ raise AttributeError("Mock object has no attribute '%s'" % name)
+ elif name in _unsupported_magics:
+ msg = 'Attempting to set unsupported magic method %r.' % name
+ raise AttributeError(msg)
+ elif name in _all_magics:
+ if self._mock_methods is not None and name not in self._mock_methods:
+ raise AttributeError("Mock object has no attribute '%s'" % name)
+
+ if not _is_instance_mock(value):
+ setattr(type(self), name, _get_method(name, value))
+ original = value
+ value = lambda *args, **kw: original(self, *args, **kw)
+ else:
+ # only set _new_name and not name so that mock_calls is tracked
+ # but not method calls
+ _check_and_set_parent(self, value, None, name)
+ setattr(type(self), name, value)
+ self._mock_children[name] = value
+ elif name == '__class__':
+ self._spec_class = value
+ return
+ else:
+ if _check_and_set_parent(self, value, name, name):
+ self._mock_children[name] = value
+
+ if self._mock_sealed and not hasattr(self, name):
+ mock_name = f'{self._extract_mock_name()}.{name}'
+ raise AttributeError(f'Cannot set {mock_name}')
+
+ return object.__setattr__(self, name, value)
+
+
+ def __delattr__(self, name):
+ if name in _all_magics and name in type(self).__dict__:
+ delattr(type(self), name)
+ if name not in self.__dict__:
+ # for magic methods that are still MagicProxy objects and
+ # not set on the instance itself
+ return
+
+ obj = self._mock_children.get(name, _missing)
+ if name in self.__dict__:
+ _safe_super(NonCallableMock, self).__delattr__(name)
+ elif obj is _deleted:
+ raise AttributeError(name)
+ if obj is not _missing:
+ del self._mock_children[name]
+ self._mock_children[name] = _deleted
+
+
+ def _format_mock_call_signature(self, args, kwargs):
+ name = self._mock_name or 'mock'
+ return _format_call_signature(name, args, kwargs)
+
+
+ def _format_mock_failure_message(self, args, kwargs, action='call'):
+ message = 'expected %s not found.\nExpected: %s\nActual: %s'
+ expected_string = self._format_mock_call_signature(args, kwargs)
+ call_args = self.call_args
+ actual_string = self._format_mock_call_signature(*call_args)
+ return message % (action, expected_string, actual_string)
+
+
+ def _get_call_signature_from_name(self, name):
+ """
+ * If call objects are asserted against a method/function like obj.meth1
+ then there could be no name for the call object to lookup. Hence just
+ return the spec_signature of the method/function being asserted against.
+ * If the name is not empty then remove () and split by '.' to get
+ list of names to iterate through the children until a potential
+ match is found. A child mock is created only during attribute access
+ so if we get a _SpecState then no attributes of the spec were accessed
+ and can be safely exited.
+ """
+ if not name:
+ return self._spec_signature
+
+ sig = None
+ names = name.replace('()', '').split('.')
+ children = self._mock_children
+
+ for name in names:
+ child = children.get(name)
+ if child is None or isinstance(child, _SpecState):
+ break
+ else:
+ # If an autospecced object is attached using attach_mock the
+ # child would be a function with mock object as attribute from
+ # which signature has to be derived.
+ child = _extract_mock(child)
+ children = child._mock_children
+ sig = child._spec_signature
+
+ return sig
+
+
+ def _call_matcher(self, _call):
+ """
+ Given a call (or simply an (args, kwargs) tuple), return a
+ comparison key suitable for matching with other calls.
+ This is a best effort method which relies on the spec's signature,
+ if available, or falls back on the arguments themselves.
+ """
+
+ if isinstance(_call, tuple) and len(_call) > 2:
+ sig = self._get_call_signature_from_name(_call[0])
+ else:
+ sig = self._spec_signature
+
+ if sig is not None:
+ if len(_call) == 2:
+ name = ''
+ args, kwargs = _call
+ else:
+ name, args, kwargs = _call
+ try:
+ bound_call = sig.bind(*args, **kwargs)
+ return call(name, bound_call.args, bound_call.kwargs)
+ except TypeError as e:
+ return e.with_traceback(None)
+ else:
+ return _call
+
+ def assert_not_called(self):
+ """assert that the mock was never called.
+ """
+ if self.call_count != 0:
+ msg = ("Expected '%s' to not have been called. Called %s times.%s"
+ % (self._mock_name or 'mock',
+ self.call_count,
+ self._calls_repr()))
+ raise AssertionError(msg)
+
+ def assert_called(self):
+ """assert that the mock was called at least once
+ """
+ if self.call_count == 0:
+ msg = ("Expected '%s' to have been called." %
+ (self._mock_name or 'mock'))
+ raise AssertionError(msg)
+
+ def assert_called_once(self):
+ """assert that the mock was called only once.
+ """
+ if not self.call_count == 1:
+ msg = ("Expected '%s' to have been called once. Called %s times.%s"
+ % (self._mock_name or 'mock',
+ self.call_count,
+ self._calls_repr()))
+ raise AssertionError(msg)
+
+ def assert_called_with(self, /, *args, **kwargs):
+ """assert that the last call was made with the specified arguments.
+
+ Raises an AssertionError if the args and keyword args passed in are
+ different to the last call to the mock."""
+ if self.call_args is None:
+ expected = self._format_mock_call_signature(args, kwargs)
+ actual = 'not called.'
+ error_message = ('expected call not found.\nExpected: %s\nActual: %s'
+ % (expected, actual))
+ raise AssertionError(error_message)
+
+ def _error_message():
+ msg = self._format_mock_failure_message(args, kwargs)
+ return msg
+ expected = self._call_matcher(_Call((args, kwargs), two=True))
+ actual = self._call_matcher(self.call_args)
+ if actual != expected:
+ cause = expected if isinstance(expected, Exception) else None
+ raise AssertionError(_error_message()) from cause
+
+
+ def assert_called_once_with(self, /, *args, **kwargs):
+ """assert that the mock was called exactly once and that that call was
+ with the specified arguments."""
+ if not self.call_count == 1:
+ msg = ("Expected '%s' to be called once. Called %s times.%s"
+ % (self._mock_name or 'mock',
+ self.call_count,
+ self._calls_repr()))
+ raise AssertionError(msg)
+ return self.assert_called_with(*args, **kwargs)
+
+
+ def assert_has_calls(self, calls, any_order=False):
+ """assert the mock has been called with the specified calls.
+ The `mock_calls` list is checked for the calls.
+
+ If `any_order` is False (the default) then the calls must be
+ sequential. There can be extra calls before or after the
+ specified calls.
+
+ If `any_order` is True then the calls can be in any order, but
+ they must all appear in `mock_calls`."""
+ expected = [self._call_matcher(c) for c in calls]
+ cause = next((e for e in expected if isinstance(e, Exception)), None)
+ all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
+ if not any_order:
+ if expected not in all_calls:
+ if cause is None:
+ problem = 'Calls not found.'
+ else:
+ problem = ('Error processing expected calls.\n'
+ 'Errors: {}').format(
+ [e if isinstance(e, Exception) else None
+ for e in expected])
+ raise AssertionError(
+ f'{problem}\n'
+ f'Expected: {_CallList(calls)}'
+ f'{self._calls_repr(prefix="Actual").rstrip(".")}'
+ ) from cause
+ return
+
+ all_calls = list(all_calls)
+
+ not_found = []
+ for kall in expected:
+ try:
+ all_calls.remove(kall)
+ except ValueError:
+ not_found.append(kall)
+ if not_found:
+ raise AssertionError(
+ '%r does not contain all of %r in its call list, '
+ 'found %r instead' % (self._mock_name or 'mock',
+ tuple(not_found), all_calls)
+ ) from cause
+
+
+ def assert_any_call(self, /, *args, **kwargs):
+ """assert the mock has been called with the specified arguments.
+
+ The assert passes if the mock has *ever* been called, unlike
+ `assert_called_with` and `assert_called_once_with` that only pass if
+ the call is the most recent one."""
+ expected = self._call_matcher(_Call((args, kwargs), two=True))
+ cause = expected if isinstance(expected, Exception) else None
+ actual = [self._call_matcher(c) for c in self.call_args_list]
+ if cause or expected not in _AnyComparer(actual):
+ expected_string = self._format_mock_call_signature(args, kwargs)
+ raise AssertionError(
+ '%s call not found' % expected_string
+ ) from cause
+
+
+ def _get_child_mock(self, /, **kw):
+ """Create the child mocks for attributes and return value.
+ By default child mocks will be the same type as the parent.
+ Subclasses of Mock may want to override this to customize the way
+ child mocks are made.
+
+ For non-callable mocks the callable variant will be used (rather than
+ any custom subclass)."""
+ _new_name = kw.get("_new_name")
+ if _new_name in self.__dict__['_spec_asyncs']:
+ return AsyncMock(**kw)
+
+ if self._mock_sealed:
+ attribute = f".{kw['name']}" if "name" in kw else "()"
+ mock_name = self._extract_mock_name() + attribute
+ raise AttributeError(mock_name)
+
+ _type = type(self)
+ if issubclass(_type, MagicMock) and _new_name in _async_method_magics:
+ # Any asynchronous magic becomes an AsyncMock
+ klass = AsyncMock
+ elif issubclass(_type, AsyncMockMixin):
+ if (_new_name in _all_sync_magics or
+ self._mock_methods and _new_name in self._mock_methods):
+ # Any synchronous method on AsyncMock becomes a MagicMock
+ klass = MagicMock
+ else:
+ klass = AsyncMock
+ elif not issubclass(_type, CallableMixin):
+ if issubclass(_type, NonCallableMagicMock):
+ klass = MagicMock
+ elif issubclass(_type, NonCallableMock):
+ klass = Mock
+ else:
+ klass = _type.__mro__[1]
+ return klass(**kw)
+
+
+ def _calls_repr(self, prefix="Calls"):
+ """Renders self.mock_calls as a string.
+
+ Example: "\nCalls: [call(1), call(2)]."
+
+ If self.mock_calls is empty, an empty string is returned. The
+ output will be truncated if very long.
+ """
+ if not self.mock_calls:
+ return ""
+ return f"\n{prefix}: {safe_repr(self.mock_calls)}."
+
+
+_MOCK_SIG = inspect.signature(NonCallableMock.__init__)
+
+
+class _AnyComparer(list):
+ """A list which checks if it contains a call which may have an
+ argument of ANY, flipping the components of item and self from
+ their traditional locations so that ANY is guaranteed to be on
+ the left."""
+ def __contains__(self, item):
+ for _call in self:
+ assert len(item) == len(_call)
+ if all([
+ expected == actual
+ for expected, actual in zip(item, _call)
+ ]):
+ return True
+ return False
+
+
+def _try_iter(obj):
+ if obj is None:
+ return obj
+ if _is_exception(obj):
+ return obj
+ if _callable(obj):
+ return obj
+ try:
+ return iter(obj)
+ except TypeError:
+ # XXXX backwards compatibility
+ # but this will blow up on first call - so maybe we should fail early?
+ return obj
+
+
+class CallableMixin(Base):
+
+ def __init__(self, spec=None, side_effect=None, return_value=DEFAULT,
+ wraps=None, name=None, spec_set=None, parent=None,
+ _spec_state=None, _new_name='', _new_parent=None, **kwargs):
+ self.__dict__['_mock_return_value'] = return_value
+ _safe_super(CallableMixin, self).__init__(
+ spec, wraps, name, spec_set, parent,
+ _spec_state, _new_name, _new_parent, **kwargs
+ )
+
+ self.side_effect = side_effect
+
+
+ def _mock_check_sig(self, /, *args, **kwargs):
+ # stub method that can be replaced with one with a specific signature
+ pass
+
+
+ def __call__(self, /, *args, **kwargs):
+ # can't use self in-case a function / method we are mocking uses self
+ # in the signature
+ self._mock_check_sig(*args, **kwargs)
+ self._increment_mock_call(*args, **kwargs)
+ return self._mock_call(*args, **kwargs)
+
+
+ def _mock_call(self, /, *args, **kwargs):
+ return self._execute_mock_call(*args, **kwargs)
+
+ def _increment_mock_call(self, /, *args, **kwargs):
+ self.called = True
+ self.call_count += 1
+
+ # handle call_args
+ # needs to be set here so assertions on call arguments pass before
+ # execution in the case of awaited calls
+ _call = _Call((args, kwargs), two=True)
+ self.call_args = _call
+ self.call_args_list.append(_call)
+
+ # initial stuff for method_calls:
+ do_method_calls = self._mock_parent is not None
+ method_call_name = self._mock_name
+
+ # initial stuff for mock_calls:
+ mock_call_name = self._mock_new_name
+ is_a_call = mock_call_name == '()'
+ self.mock_calls.append(_Call(('', args, kwargs)))
+
+ # follow up the chain of mocks:
+ _new_parent = self._mock_new_parent
+ while _new_parent is not None:
+
+ # handle method_calls:
+ if do_method_calls:
+ _new_parent.method_calls.append(_Call((method_call_name, args, kwargs)))
+ do_method_calls = _new_parent._mock_parent is not None
+ if do_method_calls:
+ method_call_name = _new_parent._mock_name + '.' + method_call_name
+
+ # handle mock_calls:
+ this_mock_call = _Call((mock_call_name, args, kwargs))
+ _new_parent.mock_calls.append(this_mock_call)
+
+ if _new_parent._mock_new_name:
+ if is_a_call:
+ dot = ''
+ else:
+ dot = '.'
+ is_a_call = _new_parent._mock_new_name == '()'
+ mock_call_name = _new_parent._mock_new_name + dot + mock_call_name
+
+ # follow the parental chain:
+ _new_parent = _new_parent._mock_new_parent
+
+ def _execute_mock_call(self, /, *args, **kwargs):
+ # separate from _increment_mock_call so that awaited functions are
+ # executed separately from their call, also AsyncMock overrides this method
+
+ effect = self.side_effect
+ if effect is not None:
+ if _is_exception(effect):
+ raise effect
+ elif not _callable(effect):
+ result = next(effect)
+ if _is_exception(result):
+ raise result
+ else:
+ result = effect(*args, **kwargs)
+
+ if result is not DEFAULT:
+ return result
+
+ if self._mock_return_value is not DEFAULT:
+ return self.return_value
+
+ if self._mock_wraps is not None:
+ return self._mock_wraps(*args, **kwargs)
+
+ return self.return_value
+
+
+
+class Mock(CallableMixin, NonCallableMock):
+ """
+ Create a new `Mock` object. `Mock` takes several optional arguments
+ that specify the behaviour of the Mock object:
+
+ * `spec`: This can be either a list of strings or an existing object (a
+ class or instance) that acts as the specification for the mock object. If
+ you pass in an object then a list of strings is formed by calling dir on
+ the object (excluding unsupported magic attributes and methods). Accessing
+ any attribute not in this list will raise an `AttributeError`.
+
+ If `spec` is an object (rather than a list of strings) then
+ `mock.__class__` returns the class of the spec object. This allows mocks
+ to pass `isinstance` tests.
+
+ * `spec_set`: A stricter variant of `spec`. If used, attempting to *set*
+ or get an attribute on the mock that isn't on the object passed as
+ `spec_set` will raise an `AttributeError`.
+
+ * `side_effect`: A function to be called whenever the Mock is called. See
+ the `side_effect` attribute. Useful for raising exceptions or
+ dynamically changing return values. The function is called with the same
+ arguments as the mock, and unless it returns `DEFAULT`, the return
+ value of this function is used as the return value.
+
+ If `side_effect` is an iterable then each call to the mock will return
+ the next value from the iterable. If any of the members of the iterable
+ are exceptions they will be raised instead of returned.
+
+ * `return_value`: The value returned when the mock is called. By default
+ this is a new Mock (created on first access). See the
+ `return_value` attribute.
+
+ * `wraps`: Item for the mock object to wrap. If `wraps` is not None then
+ calling the Mock will pass the call through to the wrapped object
+ (returning the real result). Attribute access on the mock will return a
+ Mock object that wraps the corresponding attribute of the wrapped object
+ (so attempting to access an attribute that doesn't exist will raise an
+ `AttributeError`).
+
+ If the mock has an explicit `return_value` set then calls are not passed
+ to the wrapped object and the `return_value` is returned instead.
+
+ * `name`: If the mock has a name then it will be used in the repr of the
+ mock. This can be useful for debugging. The name is propagated to child
+ mocks.
+
+ Mocks can also be called with arbitrary keyword arguments. These will be
+ used to set attributes on the mock after it is created.
+ """
+
+
+def _dot_lookup(thing, comp, import_path):
+ try:
+ return getattr(thing, comp)
+ except AttributeError:
+ __import__(import_path)
+ return getattr(thing, comp)
+
+
+def _importer(target):
+ components = target.split('.')
+ import_path = components.pop(0)
+ thing = __import__(import_path)
+
+ for comp in components:
+ import_path += ".%s" % comp
+ thing = _dot_lookup(thing, comp, import_path)
+ return thing
+
+
+# _check_spec_arg_typos takes kwargs from commands like patch and checks that
+# they don't contain common misspellings of arguments related to autospeccing.
+def _check_spec_arg_typos(kwargs_to_check):
+ typos = ("autospect", "auto_spec", "set_spec")
+ for typo in typos:
+ if typo in kwargs_to_check:
+ raise RuntimeError(
+ f"{typo!r} might be a typo; use unsafe=True if this is intended"
+ )
+
+
+class _patch(object):
+
+ attribute_name = None
+ _active_patches = []
+
+ def __init__(
+ self, getter, attribute, new, spec, create,
+ spec_set, autospec, new_callable, kwargs, *, unsafe=False
+ ):
+ if new_callable is not None:
+ if new is not DEFAULT:
+ raise ValueError(
+ "Cannot use 'new' and 'new_callable' together"
+ )
+ if autospec is not None:
+ raise ValueError(
+ "Cannot use 'autospec' and 'new_callable' together"
+ )
+ if not unsafe:
+ _check_spec_arg_typos(kwargs)
+ if _is_instance_mock(spec):
+ raise InvalidSpecError(
+ f'Cannot spec attr {attribute!r} as the spec '
+ f'has already been mocked out. [spec={spec!r}]')
+ if _is_instance_mock(spec_set):
+ raise InvalidSpecError(
+ f'Cannot spec attr {attribute!r} as the spec_set '
+ f'target has already been mocked out. [spec_set={spec_set!r}]')
+
+ self.getter = getter
+ self.attribute = attribute
+ self.new = new
+ self.new_callable = new_callable
+ self.spec = spec
+ self.create = create
+ self.has_local = False
+ self.spec_set = spec_set
+ self.autospec = autospec
+ self.kwargs = kwargs
+ self.additional_patchers = []
+
+
+ def copy(self):
+ patcher = _patch(
+ self.getter, self.attribute, self.new, self.spec,
+ self.create, self.spec_set,
+ self.autospec, self.new_callable, self.kwargs
+ )
+ patcher.attribute_name = self.attribute_name
+ patcher.additional_patchers = [
+ p.copy() for p in self.additional_patchers
+ ]
+ return patcher
+
+
+ def __call__(self, func):
+ if isinstance(func, type):
+ return self.decorate_class(func)
+ if inspect.iscoroutinefunction(func):
+ return self.decorate_async_callable(func)
+ return self.decorate_callable(func)
+
+
+ def decorate_class(self, klass):
+ for attr in dir(klass):
+ if not attr.startswith(patch.TEST_PREFIX):
+ continue
+
+ attr_value = getattr(klass, attr)
+ if not hasattr(attr_value, "__call__"):
+ continue
+
+ patcher = self.copy()
+ setattr(klass, attr, patcher(attr_value))
+ return klass
+
+
+ @contextlib.contextmanager
+ def decoration_helper(self, patched, args, keywargs):
+ extra_args = []
+ with contextlib.ExitStack() as exit_stack:
+ for patching in patched.patchings:
+ arg = exit_stack.enter_context(patching)
+ if patching.attribute_name is not None:
+ keywargs.update(arg)
+ elif patching.new is DEFAULT:
+ extra_args.append(arg)
+
+ args += tuple(extra_args)
+ yield (args, keywargs)
+
+
+ def decorate_callable(self, func):
+ # NB. Keep the method in sync with decorate_async_callable()
+ if hasattr(func, 'patchings'):
+ func.patchings.append(self)
+ return func
+
+ @wraps(func)
+ def patched(*args, **keywargs):
+ with self.decoration_helper(patched,
+ args,
+ keywargs) as (newargs, newkeywargs):
+ return func(*newargs, **newkeywargs)
+
+ patched.patchings = [self]
+ return patched
+
+
+ def decorate_async_callable(self, func):
+ # NB. Keep the method in sync with decorate_callable()
+ if hasattr(func, 'patchings'):
+ func.patchings.append(self)
+ return func
+
+ @wraps(func)
+ async def patched(*args, **keywargs):
+ with self.decoration_helper(patched,
+ args,
+ keywargs) as (newargs, newkeywargs):
+ return await func(*newargs, **newkeywargs)
+
+ patched.patchings = [self]
+ return patched
+
+
+ def get_original(self):
+ target = self.getter()
+ name = self.attribute
+
+ original = DEFAULT
+ local = False
+
+ try:
+ original = target.__dict__[name]
+ except (AttributeError, KeyError):
+ original = getattr(target, name, DEFAULT)
+ else:
+ local = True
+
+ if name in _builtins and isinstance(target, ModuleType):
+ self.create = True
+
+ if not self.create and original is DEFAULT:
+ raise AttributeError(
+ "%s does not have the attribute %r" % (target, name)
+ )
+ return original, local
+
+
+ def __enter__(self):
+ """Perform the patch."""
+ new, spec, spec_set = self.new, self.spec, self.spec_set
+ autospec, kwargs = self.autospec, self.kwargs
+ new_callable = self.new_callable
+ self.target = self.getter()
+
+ # normalise False to None
+ if spec is False:
+ spec = None
+ if spec_set is False:
+ spec_set = None
+ if autospec is False:
+ autospec = None
+
+ if spec is not None and autospec is not None:
+ raise TypeError("Can't specify spec and autospec")
+ if ((spec is not None or autospec is not None) and
+ spec_set not in (True, None)):
+ raise TypeError("Can't provide explicit spec_set *and* spec or autospec")
+
+ original, local = self.get_original()
+
+ if new is DEFAULT and autospec is None:
+ inherit = False
+ if spec is True:
+ # set spec to the object we are replacing
+ spec = original
+ if spec_set is True:
+ spec_set = original
+ spec = None
+ elif spec is not None:
+ if spec_set is True:
+ spec_set = spec
+ spec = None
+ elif spec_set is True:
+ spec_set = original
+
+ if spec is not None or spec_set is not None:
+ if original is DEFAULT:
+ raise TypeError("Can't use 'spec' with create=True")
+ if isinstance(original, type):
+ # If we're patching out a class and there is a spec
+ inherit = True
+ if spec is None and _is_async_obj(original):
+ Klass = AsyncMock
+ else:
+ Klass = MagicMock
+ _kwargs = {}
+ if new_callable is not None:
+ Klass = new_callable
+ elif spec is not None or spec_set is not None:
+ this_spec = spec
+ if spec_set is not None:
+ this_spec = spec_set
+ if _is_list(this_spec):
+ not_callable = '__call__' not in this_spec
+ else:
+ not_callable = not callable(this_spec)
+ if _is_async_obj(this_spec):
+ Klass = AsyncMock
+ elif not_callable:
+ Klass = NonCallableMagicMock
+
+ if spec is not None:
+ _kwargs['spec'] = spec
+ if spec_set is not None:
+ _kwargs['spec_set'] = spec_set
+
+ # add a name to mocks
+ if (isinstance(Klass, type) and
+ issubclass(Klass, NonCallableMock) and self.attribute):
+ _kwargs['name'] = self.attribute
+
+ _kwargs.update(kwargs)
+ new = Klass(**_kwargs)
+
+ if inherit and _is_instance_mock(new):
+ # we can only tell if the instance should be callable if the
+ # spec is not a list
+ this_spec = spec
+ if spec_set is not None:
+ this_spec = spec_set
+ if (not _is_list(this_spec) and not
+ _instance_callable(this_spec)):
+ Klass = NonCallableMagicMock
+
+ _kwargs.pop('name')
+ new.return_value = Klass(_new_parent=new, _new_name='()',
+ **_kwargs)
+ elif autospec is not None:
+ # spec is ignored, new *must* be default, spec_set is treated
+ # as a boolean. Should we check spec is not None and that spec_set
+ # is a bool?
+ if new is not DEFAULT:
+ raise TypeError(
+ "autospec creates the mock for you. Can't specify "
+ "autospec and new."
+ )
+ if original is DEFAULT:
+ raise TypeError("Can't use 'autospec' with create=True")
+ spec_set = bool(spec_set)
+ if autospec is True:
+ autospec = original
+
+ if _is_instance_mock(self.target):
+ raise InvalidSpecError(
+ f'Cannot autospec attr {self.attribute!r} as the patch '
+ f'target has already been mocked out. '
+ f'[target={self.target!r}, attr={autospec!r}]')
+ if _is_instance_mock(autospec):
+ target_name = getattr(self.target, '__name__', self.target)
+ raise InvalidSpecError(
+ f'Cannot autospec attr {self.attribute!r} from target '
+ f'{target_name!r} as it has already been mocked out. '
+ f'[target={self.target!r}, attr={autospec!r}]')
+
+ new = create_autospec(autospec, spec_set=spec_set,
+ _name=self.attribute, **kwargs)
+ elif kwargs:
+ # can't set keyword args when we aren't creating the mock
+ # XXXX If new is a Mock we could call new.configure_mock(**kwargs)
+ raise TypeError("Can't pass kwargs to a mock we aren't creating")
+
+ new_attr = new
+
+ self.temp_original = original
+ self.is_local = local
+ self._exit_stack = contextlib.ExitStack()
+ try:
+ setattr(self.target, self.attribute, new_attr)
+ if self.attribute_name is not None:
+ extra_args = {}
+ if self.new is DEFAULT:
+ extra_args[self.attribute_name] = new
+ for patching in self.additional_patchers:
+ arg = self._exit_stack.enter_context(patching)
+ if patching.new is DEFAULT:
+ extra_args.update(arg)
+ return extra_args
+
+ return new
+ except:
+ if not self.__exit__(*sys.exc_info()):
+ raise
+
+ def __exit__(self, *exc_info):
+ """Undo the patch."""
+ if self.is_local and self.temp_original is not DEFAULT:
+ setattr(self.target, self.attribute, self.temp_original)
+ else:
+ delattr(self.target, self.attribute)
+ if not self.create and (not hasattr(self.target, self.attribute) or
+ self.attribute in ('__doc__', '__module__',
+ '__defaults__', '__annotations__',
+ '__kwdefaults__')):
+ # needed for proxy objects like django settings
+ setattr(self.target, self.attribute, self.temp_original)
+
+ del self.temp_original
+ del self.is_local
+ del self.target
+ exit_stack = self._exit_stack
+ del self._exit_stack
+ return exit_stack.__exit__(*exc_info)
+
+
+ def start(self):
+ """Activate a patch, returning any created mock."""
+ result = self.__enter__()
+ self._active_patches.append(self)
+ return result
+
+
+ def stop(self):
+ """Stop an active patch."""
+ try:
+ self._active_patches.remove(self)
+ except ValueError:
+ # If the patch hasn't been started this will fail
+ return None
+
+ return self.__exit__(None, None, None)
+
+
+
+def _get_target(target):
+ try:
+ target, attribute = target.rsplit('.', 1)
+ except (TypeError, ValueError, AttributeError):
+ raise TypeError(
+ f"Need a valid target to patch. You supplied: {target!r}")
+ getter = lambda: _importer(target)
+ return getter, attribute
+
+
+def _patch_object(
+ target, attribute, new=DEFAULT, spec=None,
+ create=False, spec_set=None, autospec=None,
+ new_callable=None, *, unsafe=False, **kwargs
+ ):
+ """
+ patch the named member (`attribute`) on an object (`target`) with a mock
+ object.
+
+ `patch.object` can be used as a decorator, class decorator or a context
+ manager. Arguments `new`, `spec`, `create`, `spec_set`,
+ `autospec` and `new_callable` have the same meaning as for `patch`. Like
+ `patch`, `patch.object` takes arbitrary keyword arguments for configuring
+ the mock object it creates.
+
+ When used as a class decorator `patch.object` honours `patch.TEST_PREFIX`
+ for choosing which methods to wrap.
+ """
+ if type(target) is str:
+ raise TypeError(
+ f"{target!r} must be the actual object to be patched, not a str"
+ )
+ getter = lambda: target
+ return _patch(
+ getter, attribute, new, spec, create,
+ spec_set, autospec, new_callable, kwargs, unsafe=unsafe
+ )
+
+
+def _patch_multiple(target, spec=None, create=False, spec_set=None,
+ autospec=None, new_callable=None, **kwargs):
+ """Perform multiple patches in a single call. It takes the object to be
+ patched (either as an object or a string to fetch the object by importing)
+ and keyword arguments for the patches::
+
+ with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'):
+ ...
+
+ Use `DEFAULT` as the value if you want `patch.multiple` to create
+ mocks for you. In this case the created mocks are passed into a decorated
+ function by keyword, and a dictionary is returned when `patch.multiple` is
+ used as a context manager.
+
+ `patch.multiple` can be used as a decorator, class decorator or a context
+ manager. The arguments `spec`, `spec_set`, `create`,
+ `autospec` and `new_callable` have the same meaning as for `patch`. These
+ arguments will be applied to *all* patches done by `patch.multiple`.
+
+ When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX`
+ for choosing which methods to wrap.
+ """
+ if type(target) is str:
+ getter = lambda: _importer(target)
+ else:
+ getter = lambda: target
+
+ if not kwargs:
+ raise ValueError(
+ 'Must supply at least one keyword argument with patch.multiple'
+ )
+ # need to wrap in a list for python 3, where items is a view
+ items = list(kwargs.items())
+ attribute, new = items[0]
+ patcher = _patch(
+ getter, attribute, new, spec, create, spec_set,
+ autospec, new_callable, {}
+ )
+ patcher.attribute_name = attribute
+ for attribute, new in items[1:]:
+ this_patcher = _patch(
+ getter, attribute, new, spec, create, spec_set,
+ autospec, new_callable, {}
+ )
+ this_patcher.attribute_name = attribute
+ patcher.additional_patchers.append(this_patcher)
+ return patcher
+
+
+def patch(
+ target, new=DEFAULT, spec=None, create=False,
+ spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs
+ ):
+ """
+ `patch` acts as a function decorator, class decorator or a context
+ manager. Inside the body of the function or with statement, the `target`
+ is patched with a `new` object. When the function/with statement exits
+ the patch is undone.
+
+ If `new` is omitted, then the target is replaced with an
+ `AsyncMock if the patched object is an async function or a
+ `MagicMock` otherwise. If `patch` is used as a decorator and `new` is
+ omitted, the created mock is passed in as an extra argument to the
+ decorated function. If `patch` is used as a context manager the created
+ mock is returned by the context manager.
+
+ `target` should be a string in the form `'package.module.ClassName'`. The
+ `target` is imported and the specified object replaced with the `new`
+ object, so the `target` must be importable from the environment you are
+ calling `patch` from. The target is imported when the decorated function
+ is executed, not at decoration time.
+
+ The `spec` and `spec_set` keyword arguments are passed to the `MagicMock`
+ if patch is creating one for you.
+
+ In addition you can pass `spec=True` or `spec_set=True`, which causes
+ patch to pass in the object being mocked as the spec/spec_set object.
+
+ `new_callable` allows you to specify a different class, or callable object,
+ that will be called to create the `new` object. By default `AsyncMock` is
+ used for async functions and `MagicMock` for the rest.
+
+ A more powerful form of `spec` is `autospec`. If you set `autospec=True`
+ then the mock will be created with a spec from the object being replaced.
+ All attributes of the mock will also have the spec of the corresponding
+ attribute of the object being replaced. Methods and functions being
+ mocked will have their arguments checked and will raise a `TypeError` if
+ they are called with the wrong signature. For mocks replacing a class,
+ their return value (the 'instance') will have the same spec as the class.
+
+ Instead of `autospec=True` you can pass `autospec=some_object` to use an
+ arbitrary object as the spec instead of the one being replaced.
+
+ By default `patch` will fail to replace attributes that don't exist. If
+ you pass in `create=True`, and the attribute doesn't exist, patch will
+ create the attribute for you when the patched function is called, and
+ delete it again afterwards. This is useful for writing tests against
+ attributes that your production code creates at runtime. It is off by
+ default because it can be dangerous. With it switched on you can write
+ passing tests against APIs that don't actually exist!
+
+ Patch can be used as a `TestCase` class decorator. It works by
+ decorating each test method in the class. This reduces the boilerplate
+ code when your test methods share a common patchings set. `patch` finds
+ tests by looking for method names that start with `patch.TEST_PREFIX`.
+ By default this is `test`, which matches the way `unittest` finds tests.
+ You can specify an alternative prefix by setting `patch.TEST_PREFIX`.
+
+ Patch can be used as a context manager, with the with statement. Here the
+ patching applies to the indented block after the with statement. If you
+ use "as" then the patched object will be bound to the name after the
+ "as"; very useful if `patch` is creating a mock object for you.
+
+ Patch will raise a `RuntimeError` if passed some common misspellings of
+ the arguments autospec and spec_set. Pass the argument `unsafe` with the
+ value True to disable that check.
+
+ `patch` takes arbitrary keyword arguments. These will be passed to
+ `AsyncMock` if the patched object is asynchronous, to `MagicMock`
+ otherwise or to `new_callable` if specified.
+
+ `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are
+ available for alternate use-cases.
+ """
+ getter, attribute = _get_target(target)
+ return _patch(
+ getter, attribute, new, spec, create,
+ spec_set, autospec, new_callable, kwargs, unsafe=unsafe
+ )
+
+
+class _patch_dict(object):
+ """
+ Patch a dictionary, or dictionary like object, and restore the dictionary
+ to its original state after the test.
+
+ `in_dict` can be a dictionary or a mapping like container. If it is a
+ mapping then it must at least support getting, setting and deleting items
+ plus iterating over keys.
+
+ `in_dict` can also be a string specifying the name of the dictionary, which
+ will then be fetched by importing it.
+
+ `values` can be a dictionary of values to set in the dictionary. `values`
+ can also be an iterable of `(key, value)` pairs.
+
+ If `clear` is True then the dictionary will be cleared before the new
+ values are set.
+
+ `patch.dict` can also be called with arbitrary keyword arguments to set
+ values in the dictionary::
+
+ with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()):
+ ...
+
+ `patch.dict` can be used as a context manager, decorator or class
+ decorator. When used as a class decorator `patch.dict` honours
+ `patch.TEST_PREFIX` for choosing which methods to wrap.
+ """
+
+ def __init__(self, in_dict, values=(), clear=False, **kwargs):
+ self.in_dict = in_dict
+ # support any argument supported by dict(...) constructor
+ self.values = dict(values)
+ self.values.update(kwargs)
+ self.clear = clear
+ self._original = None
+
+
+ def __call__(self, f):
+ if isinstance(f, type):
+ return self.decorate_class(f)
+ @wraps(f)
+ def _inner(*args, **kw):
+ self._patch_dict()
+ try:
+ return f(*args, **kw)
+ finally:
+ self._unpatch_dict()
+
+ return _inner
+
+
+ def decorate_class(self, klass):
+ for attr in dir(klass):
+ attr_value = getattr(klass, attr)
+ if (attr.startswith(patch.TEST_PREFIX) and
+ hasattr(attr_value, "__call__")):
+ decorator = _patch_dict(self.in_dict, self.values, self.clear)
+ decorated = decorator(attr_value)
+ setattr(klass, attr, decorated)
+ return klass
+
+
+ def __enter__(self):
+ """Patch the dict."""
+ self._patch_dict()
+ return self.in_dict
+
+
+ def _patch_dict(self):
+ values = self.values
+ if isinstance(self.in_dict, str):
+ self.in_dict = _importer(self.in_dict)
+ in_dict = self.in_dict
+ clear = self.clear
+
+ try:
+ original = in_dict.copy()
+ except AttributeError:
+ # dict like object with no copy method
+ # must support iteration over keys
+ original = {}
+ for key in in_dict:
+ original[key] = in_dict[key]
+ self._original = original
+
+ if clear:
+ _clear_dict(in_dict)
+
+ try:
+ in_dict.update(values)
+ except AttributeError:
+ # dict like object with no update method
+ for key in values:
+ in_dict[key] = values[key]
+
+
+ def _unpatch_dict(self):
+ in_dict = self.in_dict
+ original = self._original
+
+ _clear_dict(in_dict)
+
+ try:
+ in_dict.update(original)
+ except AttributeError:
+ for key in original:
+ in_dict[key] = original[key]
+
+
+ def __exit__(self, *args):
+ """Unpatch the dict."""
+ if self._original is not None:
+ self._unpatch_dict()
+ return False
+
+
+ def start(self):
+ """Activate a patch, returning any created mock."""
+ result = self.__enter__()
+ _patch._active_patches.append(self)
+ return result
+
+
+ def stop(self):
+ """Stop an active patch."""
+ try:
+ _patch._active_patches.remove(self)
+ except ValueError:
+ # If the patch hasn't been started this will fail
+ return None
+
+ return self.__exit__(None, None, None)
+
+
+def _clear_dict(in_dict):
+ try:
+ in_dict.clear()
+ except AttributeError:
+ keys = list(in_dict)
+ for key in keys:
+ del in_dict[key]
+
+
+def _patch_stopall():
+ """Stop all active patches. LIFO to unroll nested patches."""
+ for patch in reversed(_patch._active_patches):
+ patch.stop()
+
+
+patch.object = _patch_object
+patch.dict = _patch_dict
+patch.multiple = _patch_multiple
+patch.stopall = _patch_stopall
+patch.TEST_PREFIX = 'test'
+
+magic_methods = (
+ "lt le gt ge eq ne "
+ "getitem setitem delitem "
+ "len contains iter "
+ "hash str sizeof "
+ "enter exit "
+ # we added divmod and rdivmod here instead of numerics
+ # because there is no idivmod
+ "divmod rdivmod neg pos abs invert "
+ "complex int float index "
+ "round trunc floor ceil "
+ "bool next "
+ "fspath "
+ "aiter "
+)
+
+numerics = (
+ "add sub mul matmul div floordiv mod lshift rshift and xor or pow truediv"
+)
+inplace = ' '.join('i%s' % n for n in numerics.split())
+right = ' '.join('r%s' % n for n in numerics.split())
+
+# not including __prepare__, __instancecheck__, __subclasscheck__
+# (as they are metaclass methods)
+# __del__ is not supported at all as it causes problems if it exists
+
+_non_defaults = {
+ '__get__', '__set__', '__delete__', '__reversed__', '__missing__',
+ '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__',
+ '__getstate__', '__setstate__', '__getformat__', '__setformat__',
+ '__repr__', '__dir__', '__subclasses__', '__format__',
+ '__getnewargs_ex__',
+}
+
+
+def _get_method(name, func):
+ "Turns a callable object (like a mock) into a real function"
+ def method(self, /, *args, **kw):
+ return func(self, *args, **kw)
+ method.__name__ = name
+ return method
+
+
+_magics = {
+ '__%s__' % method for method in
+ ' '.join([magic_methods, numerics, inplace, right]).split()
+}
+
+# Magic methods used for async `with` statements
+_async_method_magics = {"__aenter__", "__aexit__", "__anext__"}
+# Magic methods that are only used with async calls but are synchronous functions themselves
+_sync_async_magics = {"__aiter__"}
+_async_magics = _async_method_magics | _sync_async_magics
+
+_all_sync_magics = _magics | _non_defaults
+_all_magics = _all_sync_magics | _async_magics
+
+_unsupported_magics = {
+ '__getattr__', '__setattr__',
+ '__init__', '__new__', '__prepare__',
+ '__instancecheck__', '__subclasscheck__',
+ '__del__'
+}
+
+_calculate_return_value = {
+ '__hash__': lambda self: object.__hash__(self),
+ '__str__': lambda self: object.__str__(self),
+ '__sizeof__': lambda self: object.__sizeof__(self),
+ '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}",
+}
+
+_return_values = {
+ '__lt__': NotImplemented,
+ '__gt__': NotImplemented,
+ '__le__': NotImplemented,
+ '__ge__': NotImplemented,
+ '__int__': 1,
+ '__contains__': False,
+ '__len__': 0,
+ '__exit__': False,
+ '__complex__': 1j,
+ '__float__': 1.0,
+ '__bool__': True,
+ '__index__': 1,
+ '__aexit__': False,
+}
+
+
+def _get_eq(self):
+ def __eq__(other):
+ ret_val = self.__eq__._mock_return_value
+ if ret_val is not DEFAULT:
+ return ret_val
+ if self is other:
+ return True
+ return NotImplemented
+ return __eq__
+
+def _get_ne(self):
+ def __ne__(other):
+ if self.__ne__._mock_return_value is not DEFAULT:
+ return DEFAULT
+ if self is other:
+ return False
+ return NotImplemented
+ return __ne__
+
+def _get_iter(self):
+ def __iter__():
+ ret_val = self.__iter__._mock_return_value
+ if ret_val is DEFAULT:
+ return iter([])
+ # if ret_val was already an iterator, then calling iter on it should
+ # return the iterator unchanged
+ return iter(ret_val)
+ return __iter__
+
+def _get_async_iter(self):
+ def __aiter__():
+ ret_val = self.__aiter__._mock_return_value
+ if ret_val is DEFAULT:
+ return _AsyncIterator(iter([]))
+ return _AsyncIterator(iter(ret_val))
+ return __aiter__
+
+_side_effect_methods = {
+ '__eq__': _get_eq,
+ '__ne__': _get_ne,
+ '__iter__': _get_iter,
+ '__aiter__': _get_async_iter
+}
+
+
+
+def _set_return_value(mock, method, name):
+ fixed = _return_values.get(name, DEFAULT)
+ if fixed is not DEFAULT:
+ method.return_value = fixed
+ return
+
+ return_calculator = _calculate_return_value.get(name)
+ if return_calculator is not None:
+ return_value = return_calculator(mock)
+ method.return_value = return_value
+ return
+
+ side_effector = _side_effect_methods.get(name)
+ if side_effector is not None:
+ method.side_effect = side_effector(mock)
+
+
+
+class MagicMixin(Base):
+ def __init__(self, /, *args, **kw):
+ self._mock_set_magics() # make magic work for kwargs in init
+ _safe_super(MagicMixin, self).__init__(*args, **kw)
+ self._mock_set_magics() # fix magic broken by upper level init
+
+
+ def _mock_set_magics(self):
+ orig_magics = _magics | _async_method_magics
+ these_magics = orig_magics
+
+ if getattr(self, "_mock_methods", None) is not None:
+ these_magics = orig_magics.intersection(self._mock_methods)
+
+ remove_magics = set()
+ remove_magics = orig_magics - these_magics
+
+ for entry in remove_magics:
+ if entry in type(self).__dict__:
+ # remove unneeded magic methods
+ delattr(self, entry)
+
+ # don't overwrite existing attributes if called a second time
+ these_magics = these_magics - set(type(self).__dict__)
+
+ _type = type(self)
+ for entry in these_magics:
+ setattr(_type, entry, MagicProxy(entry, self))
+
+
+
+class NonCallableMagicMock(MagicMixin, NonCallableMock):
+ """A version of `MagicMock` that isn't callable."""
+ def mock_add_spec(self, spec, spec_set=False):
+ """Add a spec to a mock. `spec` can either be an object or a
+ list of strings. Only attributes on the `spec` can be fetched as
+ attributes from the mock.
+
+ If `spec_set` is True then only attributes on the spec can be set."""
+ self._mock_add_spec(spec, spec_set)
+ self._mock_set_magics()
+
+
+class AsyncMagicMixin(MagicMixin):
+ def __init__(self, /, *args, **kw):
+ self._mock_set_magics() # make magic work for kwargs in init
+ _safe_super(AsyncMagicMixin, self).__init__(*args, **kw)
+ self._mock_set_magics() # fix magic broken by upper level init
+
+class MagicMock(MagicMixin, Mock):
+ """
+ MagicMock is a subclass of Mock with default implementations
+ of most of the magic methods. You can use MagicMock without having to
+ configure the magic methods yourself.
+
+ If you use the `spec` or `spec_set` arguments then *only* magic
+ methods that exist in the spec will be created.
+
+ Attributes and the return value of a `MagicMock` will also be `MagicMocks`.
+ """
+ def mock_add_spec(self, spec, spec_set=False):
+ """Add a spec to a mock. `spec` can either be an object or a
+ list of strings. Only attributes on the `spec` can be fetched as
+ attributes from the mock.
+
+ If `spec_set` is True then only attributes on the spec can be set."""
+ self._mock_add_spec(spec, spec_set)
+ self._mock_set_magics()
+
+
+
+class MagicProxy(Base):
+ def __init__(self, name, parent):
+ self.name = name
+ self.parent = parent
+
+ def create_mock(self):
+ entry = self.name
+ parent = self.parent
+ m = parent._get_child_mock(name=entry, _new_name=entry,
+ _new_parent=parent)
+ setattr(parent, entry, m)
+ _set_return_value(parent, m, entry)
+ return m
+
+ def __get__(self, obj, _type=None):
+ return self.create_mock()
+
+
+class AsyncMockMixin(Base):
+ await_count = _delegating_property('await_count')
+ await_args = _delegating_property('await_args')
+ await_args_list = _delegating_property('await_args_list')
+
+ def __init__(self, /, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ # iscoroutinefunction() checks _is_coroutine property to say if an
+ # object is a coroutine. Without this check it looks to see if it is a
+ # function/method, which in this case it is not (since it is an
+ # AsyncMock).
+ # It is set through __dict__ because when spec_set is True, this
+ # attribute is likely undefined.
+ self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine
+ self.__dict__['_mock_await_count'] = 0
+ self.__dict__['_mock_await_args'] = None
+ self.__dict__['_mock_await_args_list'] = _CallList()
+ code_mock = NonCallableMock(spec_set=CodeType)
+ code_mock.co_flags = inspect.CO_COROUTINE
+ self.__dict__['__code__'] = code_mock
+
+ async def _execute_mock_call(self, /, *args, **kwargs):
+ # This is nearly just like super(), except for special handling
+ # of coroutines
+
+ _call = _Call((args, kwargs), two=True)
+ self.await_count += 1
+ self.await_args = _call
+ self.await_args_list.append(_call)
+
+ effect = self.side_effect
+ if effect is not None:
+ if _is_exception(effect):
+ raise effect
+ elif not _callable(effect):
+ try:
+ result = next(effect)
+ except StopIteration:
+ # It is impossible to propogate a StopIteration
+ # through coroutines because of PEP 479
+ raise StopAsyncIteration
+ if _is_exception(result):
+ raise result
+ elif iscoroutinefunction(effect):
+ result = await effect(*args, **kwargs)
+ else:
+ result = effect(*args, **kwargs)
+
+ if result is not DEFAULT:
+ return result
+
+ if self._mock_return_value is not DEFAULT:
+ return self.return_value
+
+ if self._mock_wraps is not None:
+ if iscoroutinefunction(self._mock_wraps):
+ return await self._mock_wraps(*args, **kwargs)
+ return self._mock_wraps(*args, **kwargs)
+
+ return self.return_value
+
+ def assert_awaited(self):
+ """
+ Assert that the mock was awaited at least once.
+ """
+ if self.await_count == 0:
+ msg = f"Expected {self._mock_name or 'mock'} to have been awaited."
+ raise AssertionError(msg)
+
+ def assert_awaited_once(self):
+ """
+ Assert that the mock was awaited exactly once.
+ """
+ if not self.await_count == 1:
+ msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
+ f" Awaited {self.await_count} times.")
+ raise AssertionError(msg)
+
+ def assert_awaited_with(self, /, *args, **kwargs):
+ """
+ Assert that the last await was with the specified arguments.
+ """
+ if self.await_args is None:
+ expected = self._format_mock_call_signature(args, kwargs)
+ raise AssertionError(f'Expected await: {expected}\nNot awaited')
+
+ def _error_message():
+ msg = self._format_mock_failure_message(args, kwargs, action='await')
+ return msg
+
+ expected = self._call_matcher(_Call((args, kwargs), two=True))
+ actual = self._call_matcher(self.await_args)
+ if actual != expected:
+ cause = expected if isinstance(expected, Exception) else None
+ raise AssertionError(_error_message()) from cause
+
+ def assert_awaited_once_with(self, /, *args, **kwargs):
+ """
+ Assert that the mock was awaited exactly once and with the specified
+ arguments.
+ """
+ if not self.await_count == 1:
+ msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
+ f" Awaited {self.await_count} times.")
+ raise AssertionError(msg)
+ return self.assert_awaited_with(*args, **kwargs)
+
+ def assert_any_await(self, /, *args, **kwargs):
+ """
+ Assert the mock has ever been awaited with the specified arguments.
+ """
+ expected = self._call_matcher(_Call((args, kwargs), two=True))
+ cause = expected if isinstance(expected, Exception) else None
+ actual = [self._call_matcher(c) for c in self.await_args_list]
+ if cause or expected not in _AnyComparer(actual):
+ expected_string = self._format_mock_call_signature(args, kwargs)
+ raise AssertionError(
+ '%s await not found' % expected_string
+ ) from cause
+
+ def assert_has_awaits(self, calls, any_order=False):
+ """
+ Assert the mock has been awaited with the specified calls.
+ The :attr:`await_args_list` list is checked for the awaits.
+
+ If `any_order` is False (the default) then the awaits must be
+ sequential. There can be extra calls before or after the
+ specified awaits.
+
+ If `any_order` is True then the awaits can be in any order, but
+ they must all appear in :attr:`await_args_list`.
+ """
+ expected = [self._call_matcher(c) for c in calls]
+ cause = next((e for e in expected if isinstance(e, Exception)), None)
+ all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list)
+ if not any_order:
+ if expected not in all_awaits:
+ if cause is None:
+ problem = 'Awaits not found.'
+ else:
+ problem = ('Error processing expected awaits.\n'
+ 'Errors: {}').format(
+ [e if isinstance(e, Exception) else None
+ for e in expected])
+ raise AssertionError(
+ f'{problem}\n'
+ f'Expected: {_CallList(calls)}\n'
+ f'Actual: {self.await_args_list}'
+ ) from cause
+ return
+
+ all_awaits = list(all_awaits)
+
+ not_found = []
+ for kall in expected:
+ try:
+ all_awaits.remove(kall)
+ except ValueError:
+ not_found.append(kall)
+ if not_found:
+ raise AssertionError(
+ '%r not all found in await list' % (tuple(not_found),)
+ ) from cause
+
+ def assert_not_awaited(self):
+ """
+ Assert that the mock was never awaited.
+ """
+ if self.await_count != 0:
+ msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited."
+ f" Awaited {self.await_count} times.")
+ raise AssertionError(msg)
+
+ def reset_mock(self, /, *args, **kwargs):
+ """
+ See :func:`.Mock.reset_mock()`
+ """
+ super().reset_mock(*args, **kwargs)
+ self.await_count = 0
+ self.await_args = None
+ self.await_args_list = _CallList()
+
+
+class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock):
+ """
+ Enhance :class:`Mock` with features allowing to mock
+ an async function.
+
+ The :class:`AsyncMock` object will behave so the object is
+ recognized as an async function, and the result of a call is an awaitable:
+
+ >>> mock = AsyncMock()
+ >>> iscoroutinefunction(mock)
+ True
+ >>> inspect.isawaitable(mock())
+ True
+
+
+ The result of ``mock()`` is an async function which will have the outcome
+ of ``side_effect`` or ``return_value``:
+
+ - if ``side_effect`` is a function, the async function will return the
+ result of that function,
+ - if ``side_effect`` is an exception, the async function will raise the
+ exception,
+ - if ``side_effect`` is an iterable, the async function will return the
+ next value of the iterable, however, if the sequence of result is
+ exhausted, ``StopIteration`` is raised immediately,
+ - if ``side_effect`` is not defined, the async function will return the
+ value defined by ``return_value``, hence, by default, the async function
+ returns a new :class:`AsyncMock` object.
+
+ If the outcome of ``side_effect`` or ``return_value`` is an async function,
+ the mock async function obtained when the mock object is called will be this
+ async function itself (and not an async function returning an async
+ function).
+
+ The test author can also specify a wrapped object with ``wraps``. In this
+ case, the :class:`Mock` object behavior is the same as with an
+ :class:`.Mock` object: the wrapped object may have methods
+ defined as async function functions.
+
+ Based on Martin Richard's asynctest project.
+ """
+
+
+class _ANY(object):
+ "A helper object that compares equal to everything."
+
+ def __eq__(self, other):
+ return True
+
+ def __ne__(self, other):
+ return False
+
+ def __repr__(self):
+ return '<ANY>'
+
+ANY = _ANY()
+
+
+
+def _format_call_signature(name, args, kwargs):
+ message = '%s(%%s)' % name
+ formatted_args = ''
+ args_string = ', '.join([repr(arg) for arg in args])
+ kwargs_string = ', '.join([
+ '%s=%r' % (key, value) for key, value in kwargs.items()
+ ])
+ if args_string:
+ formatted_args = args_string
+ if kwargs_string:
+ if formatted_args:
+ formatted_args += ', '
+ formatted_args += kwargs_string
+
+ return message % formatted_args
+
+
+
+class _Call(tuple):
+ """
+ A tuple for holding the results of a call to a mock, either in the form
+ `(args, kwargs)` or `(name, args, kwargs)`.
+
+ If args or kwargs are empty then a call tuple will compare equal to
+ a tuple without those values. This makes comparisons less verbose::
+
+ _Call(('name', (), {})) == ('name',)
+ _Call(('name', (1,), {})) == ('name', (1,))
+ _Call(((), {'a': 'b'})) == ({'a': 'b'},)
+
+ The `_Call` object provides a useful shortcut for comparing with call::
+
+ _Call(((1, 2), {'a': 3})) == call(1, 2, a=3)
+ _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3)
+
+ If the _Call has no name then it will match any name.
+ """
+ def __new__(cls, value=(), name='', parent=None, two=False,
+ from_kall=True):
+ args = ()
+ kwargs = {}
+ _len = len(value)
+ if _len == 3:
+ name, args, kwargs = value
+ elif _len == 2:
+ first, second = value
+ if isinstance(first, str):
+ name = first
+ if isinstance(second, tuple):
+ args = second
+ else:
+ kwargs = second
+ else:
+ args, kwargs = first, second
+ elif _len == 1:
+ value, = value
+ if isinstance(value, str):
+ name = value
+ elif isinstance(value, tuple):
+ args = value
+ else:
+ kwargs = value
+
+ if two:
+ return tuple.__new__(cls, (args, kwargs))
+
+ return tuple.__new__(cls, (name, args, kwargs))
+
+
+ def __init__(self, value=(), name=None, parent=None, two=False,
+ from_kall=True):
+ self._mock_name = name
+ self._mock_parent = parent
+ self._mock_from_kall = from_kall
+
+
+ def __eq__(self, other):
+ try:
+ len_other = len(other)
+ except TypeError:
+ return NotImplemented
+
+ self_name = ''
+ if len(self) == 2:
+ self_args, self_kwargs = self
+ else:
+ self_name, self_args, self_kwargs = self
+
+ if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None)
+ and self._mock_parent != other._mock_parent):
+ return False
+
+ other_name = ''
+ if len_other == 0:
+ other_args, other_kwargs = (), {}
+ elif len_other == 3:
+ other_name, other_args, other_kwargs = other
+ elif len_other == 1:
+ value, = other
+ if isinstance(value, tuple):
+ other_args = value
+ other_kwargs = {}
+ elif isinstance(value, str):
+ other_name = value
+ other_args, other_kwargs = (), {}
+ else:
+ other_args = ()
+ other_kwargs = value
+ elif len_other == 2:
+ # could be (name, args) or (name, kwargs) or (args, kwargs)
+ first, second = other
+ if isinstance(first, str):
+ other_name = first
+ if isinstance(second, tuple):
+ other_args, other_kwargs = second, {}
+ else:
+ other_args, other_kwargs = (), second
+ else:
+ other_args, other_kwargs = first, second
+ else:
+ return False
+
+ if self_name and other_name != self_name:
+ return False
+
+ # this order is important for ANY to work!
+ return (other_args, other_kwargs) == (self_args, self_kwargs)
+
+
+ __ne__ = object.__ne__
+
+
+ def __call__(self, /, *args, **kwargs):
+ if self._mock_name is None:
+ return _Call(('', args, kwargs), name='()')
+
+ name = self._mock_name + '()'
+ return _Call((self._mock_name, args, kwargs), name=name, parent=self)
+
+
+ def __getattr__(self, attr):
+ if self._mock_name is None:
+ return _Call(name=attr, from_kall=False)
+ name = '%s.%s' % (self._mock_name, attr)
+ return _Call(name=name, parent=self, from_kall=False)
+
+
+ def __getattribute__(self, attr):
+ if attr in tuple.__dict__:
+ raise AttributeError
+ return tuple.__getattribute__(self, attr)
+
+
+ def _get_call_arguments(self):
+ if len(self) == 2:
+ args, kwargs = self
+ else:
+ name, args, kwargs = self
+
+ return args, kwargs
+
+ @property
+ def args(self):
+ return self._get_call_arguments()[0]
+
+ @property
+ def kwargs(self):
+ return self._get_call_arguments()[1]
+
+ def __repr__(self):
+ if not self._mock_from_kall:
+ name = self._mock_name or 'call'
+ if name.startswith('()'):
+ name = 'call%s' % name
+ return name
+
+ if len(self) == 2:
+ name = 'call'
+ args, kwargs = self
+ else:
+ name, args, kwargs = self
+ if not name:
+ name = 'call'
+ elif not name.startswith('()'):
+ name = 'call.%s' % name
+ else:
+ name = 'call%s' % name
+ return _format_call_signature(name, args, kwargs)
+
+
+ def call_list(self):
+ """For a call object that represents multiple calls, `call_list`
+ returns a list of all the intermediate calls as well as the
+ final call."""
+ vals = []
+ thing = self
+ while thing is not None:
+ if thing._mock_from_kall:
+ vals.append(thing)
+ thing = thing._mock_parent
+ return _CallList(reversed(vals))
+
+
+call = _Call(from_kall=False)
+
+
+def create_autospec(spec, spec_set=False, instance=False, _parent=None,
+ _name=None, *, unsafe=False, **kwargs):
+ """Create a mock object using another object as a spec. Attributes on the
+ mock will use the corresponding attribute on the `spec` object as their
+ spec.
+
+ Functions or methods being mocked will have their arguments checked
+ to check that they are called with the correct signature.
+
+ If `spec_set` is True then attempting to set attributes that don't exist
+ on the spec object will raise an `AttributeError`.
+
+ If a class is used as a spec then the return value of the mock (the
+ instance of the class) will have the same spec. You can use a class as the
+ spec for an instance object by passing `instance=True`. The returned mock
+ will only be callable if instances of the mock are callable.
+
+ `create_autospec` will raise a `RuntimeError` if passed some common
+ misspellings of the arguments autospec and spec_set. Pass the argument
+ `unsafe` with the value True to disable that check.
+
+ `create_autospec` also takes arbitrary keyword arguments that are passed to
+ the constructor of the created mock."""
+ if _is_list(spec):
+ # can't pass a list instance to the mock constructor as it will be
+ # interpreted as a list of strings
+ spec = type(spec)
+
+ is_type = isinstance(spec, type)
+ if _is_instance_mock(spec):
+ raise InvalidSpecError(f'Cannot autospec a Mock object. '
+ f'[object={spec!r}]')
+ is_async_func = _is_async_func(spec)
+ _kwargs = {'spec': spec}
+ if spec_set:
+ _kwargs = {'spec_set': spec}
+ elif spec is None:
+ # None we mock with a normal mock without a spec
+ _kwargs = {}
+ if _kwargs and instance:
+ _kwargs['_spec_as_instance'] = True
+ if not unsafe:
+ _check_spec_arg_typos(kwargs)
+
+ _kwargs.update(kwargs)
+
+ Klass = MagicMock
+ if inspect.isdatadescriptor(spec):
+ # descriptors don't have a spec
+ # because we don't know what type they return
+ _kwargs = {}
+ elif is_async_func:
+ if instance:
+ raise RuntimeError("Instance can not be True when create_autospec "
+ "is mocking an async function")
+ Klass = AsyncMock
+ elif not _callable(spec):
+ Klass = NonCallableMagicMock
+ elif is_type and instance and not _instance_callable(spec):
+ Klass = NonCallableMagicMock
+
+ _name = _kwargs.pop('name', _name)
+
+ _new_name = _name
+ if _parent is None:
+ # for a top level object no _new_name should be set
+ _new_name = ''
+
+ mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name,
+ name=_name, **_kwargs)
+
+ if isinstance(spec, FunctionTypes):
+ # should only happen at the top level because we don't
+ # recurse for functions
+ mock = _set_signature(mock, spec)
+ if is_async_func:
+ _setup_async_mock(mock)
+ else:
+ _check_signature(spec, mock, is_type, instance)
+
+ if _parent is not None and not instance:
+ _parent._mock_children[_name] = mock
+
+ if is_type and not instance and 'return_value' not in kwargs:
+ mock.return_value = create_autospec(spec, spec_set, instance=True,
+ _name='()', _parent=mock)
+
+ for entry in dir(spec):
+ if _is_magic(entry):
+ # MagicMock already does the useful magic methods for us
+ continue
+
+ # XXXX do we need a better way of getting attributes without
+ # triggering code execution (?) Probably not - we need the actual
+ # object to mock it so we would rather trigger a property than mock
+ # the property descriptor. Likewise we want to mock out dynamically
+ # provided attributes.
+ # XXXX what about attributes that raise exceptions other than
+ # AttributeError on being fetched?
+ # we could be resilient against it, or catch and propagate the
+ # exception when the attribute is fetched from the mock
+ try:
+ original = getattr(spec, entry)
+ except AttributeError:
+ continue
+
+ kwargs = {'spec': original}
+ if spec_set:
+ kwargs = {'spec_set': original}
+
+ if not isinstance(original, FunctionTypes):
+ new = _SpecState(original, spec_set, mock, entry, instance)
+ mock._mock_children[entry] = new
+ else:
+ parent = mock
+ if isinstance(spec, FunctionTypes):
+ parent = mock.mock
+
+ skipfirst = _must_skip(spec, entry, is_type)
+ kwargs['_eat_self'] = skipfirst
+ if iscoroutinefunction(original):
+ child_klass = AsyncMock
+ else:
+ child_klass = MagicMock
+ new = child_klass(parent=parent, name=entry, _new_name=entry,
+ _new_parent=parent,
+ **kwargs)
+ mock._mock_children[entry] = new
+ _check_signature(original, new, skipfirst=skipfirst)
+
+ # so functions created with _set_signature become instance attributes,
+ # *plus* their underlying mock exists in _mock_children of the parent
+ # mock. Adding to _mock_children may be unnecessary where we are also
+ # setting as an instance attribute?
+ if isinstance(new, FunctionTypes):
+ setattr(mock, entry, new)
+
+ return mock
+
+
+def _must_skip(spec, entry, is_type):
+ """
+ Return whether we should skip the first argument on spec's `entry`
+ attribute.
+ """
+ if not isinstance(spec, type):
+ if entry in getattr(spec, '__dict__', {}):
+ # instance attribute - shouldn't skip
+ return False
+ spec = spec.__class__
+
+ for klass in spec.__mro__:
+ result = klass.__dict__.get(entry, DEFAULT)
+ if result is DEFAULT:
+ continue
+ if isinstance(result, (staticmethod, classmethod)):
+ return False
+ elif isinstance(result, FunctionTypes):
+ # Normal method => skip if looked up on type
+ # (if looked up on instance, self is already skipped)
+ return is_type
+ else:
+ return False
+
+ # function is a dynamically provided attribute
+ return is_type
+
+
+class _SpecState(object):
+
+ def __init__(self, spec, spec_set=False, parent=None,
+ name=None, ids=None, instance=False):
+ self.spec = spec
+ self.ids = ids
+ self.spec_set = spec_set
+ self.parent = parent
+ self.instance = instance
+ self.name = name
+
+
+FunctionTypes = (
+ # python function
+ type(create_autospec),
+ # instance method
+ type(ANY.__eq__),
+)
+
+
+file_spec = None
+
+
+def _to_stream(read_data):
+ if isinstance(read_data, bytes):
+ return io.BytesIO(read_data)
+ else:
+ return io.StringIO(read_data)
+
+
+def mock_open(mock=None, read_data=''):
+ """
+ A helper function to create a mock to replace the use of `open`. It works
+ for `open` called directly or used as a context manager.
+
+ The `mock` argument is the mock object to configure. If `None` (the
+ default) then a `MagicMock` will be created for you, with the API limited
+ to methods or attributes available on standard file handles.
+
+ `read_data` is a string for the `read`, `readline` and `readlines` of the
+ file handle to return. This is an empty string by default.
+ """
+ _read_data = _to_stream(read_data)
+ _state = [_read_data, None]
+
+ def _readlines_side_effect(*args, **kwargs):
+ if handle.readlines.return_value is not None:
+ return handle.readlines.return_value
+ return _state[0].readlines(*args, **kwargs)
+
+ def _read_side_effect(*args, **kwargs):
+ if handle.read.return_value is not None:
+ return handle.read.return_value
+ return _state[0].read(*args, **kwargs)
+
+ def _readline_side_effect(*args, **kwargs):
+ yield from _iter_side_effect()
+ while True:
+ yield _state[0].readline(*args, **kwargs)
+
+ def _iter_side_effect():
+ if handle.readline.return_value is not None:
+ while True:
+ yield handle.readline.return_value
+ for line in _state[0]:
+ yield line
+
+ def _next_side_effect():
+ if handle.readline.return_value is not None:
+ return handle.readline.return_value
+ return next(_state[0])
+
+ global file_spec
+ if file_spec is None:
+ import _io
+ file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
+
+ if mock is None:
+ mock = MagicMock(name='open', spec=open)
+
+ handle = MagicMock(spec=file_spec)
+ handle.__enter__.return_value = handle
+
+ handle.write.return_value = None
+ handle.read.return_value = None
+ handle.readline.return_value = None
+ handle.readlines.return_value = None
+
+ handle.read.side_effect = _read_side_effect
+ _state[1] = _readline_side_effect()
+ handle.readline.side_effect = _state[1]
+ handle.readlines.side_effect = _readlines_side_effect
+ handle.__iter__.side_effect = _iter_side_effect
+ handle.__next__.side_effect = _next_side_effect
+
+ def reset_data(*args, **kwargs):
+ _state[0] = _to_stream(read_data)
+ if handle.readline.side_effect == _state[1]:
+ # Only reset the side effect if the user hasn't overridden it.
+ _state[1] = _readline_side_effect()
+ handle.readline.side_effect = _state[1]
+ return DEFAULT
+
+ mock.side_effect = reset_data
+ mock.return_value = handle
+ return mock
+
+
+class PropertyMock(Mock):
+ """
+ A mock intended to be used as a property, or other descriptor, on a class.
+ `PropertyMock` provides `__get__` and `__set__` methods so you can specify
+ a return value when it is fetched.
+
+ Fetching a `PropertyMock` instance from an object calls the mock, with
+ no args. Setting it calls the mock with the value being set.
+ """
+ def _get_child_mock(self, /, **kwargs):
+ return MagicMock(**kwargs)
+
+ def __get__(self, obj, obj_type=None):
+ return self()
+ def __set__(self, obj, val):
+ self(val)
+
+
+def seal(mock):
+ """Disable the automatic generation of child mocks.
+
+ Given an input Mock, seals it to ensure no further mocks will be generated
+ when accessing an attribute that was not already defined.
+
+ The operation recursively seals the mock passed in, meaning that
+ the mock itself, any mocks generated by accessing one of its attributes,
+ and all assigned mocks without a name or spec will be sealed.
+ """
+ mock._mock_sealed = True
+ for attr in dir(mock):
+ try:
+ m = getattr(mock, attr)
+ except AttributeError:
+ continue
+ if not isinstance(m, NonCallableMock):
+ continue
+ if isinstance(m._mock_children.get(attr), _SpecState):
+ continue
+ if m._mock_new_parent is mock:
+ seal(m)
+
+
+class _AsyncIterator:
+ """
+ Wraps an iterator in an asynchronous iterator.
+ """
+ def __init__(self, iterator):
+ self.iterator = iterator
+ code_mock = NonCallableMock(spec_set=CodeType)
+ code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE
+ self.__dict__['__code__'] = code_mock
+
+ async def __anext__(self):
+ try:
+ return next(self.iterator)
+ except StopIteration:
+ pass
+ raise StopAsyncIteration
diff --git a/lib/python3.10/unittest/result.py b/lib/python3.10/unittest/result.py
new file mode 100644
index 0000000..3da7005
--- /dev/null
+++ b/lib/python3.10/unittest/result.py
@@ -0,0 +1,242 @@
+"""Test result object"""
+
+import io
+import sys
+import traceback
+
+from . import util
+from functools import wraps
+
+__unittest = True
+
+def failfast(method):
+ @wraps(method)
+ def inner(self, *args, **kw):
+ if getattr(self, 'failfast', False):
+ self.stop()
+ return method(self, *args, **kw)
+ return inner
+
+STDOUT_LINE = '\nStdout:\n%s'
+STDERR_LINE = '\nStderr:\n%s'
+
+
+class TestResult(object):
+ """Holder for test result information.
+
+ Test results are automatically managed by the TestCase and TestSuite
+ classes, and do not need to be explicitly manipulated by writers of tests.
+
+ Each instance holds the total number of tests run, and collections of
+ failures and errors that occurred among those test runs. The collections
+ contain tuples of (testcase, exceptioninfo), where exceptioninfo is the
+ formatted traceback of the error that occurred.
+ """
+ _previousTestClass = None
+ _testRunEntered = False
+ _moduleSetUpFailed = False
+ def __init__(self, stream=None, descriptions=None, verbosity=None):
+ self.failfast = False
+ self.failures = []
+ self.errors = []
+ self.testsRun = 0
+ self.skipped = []
+ self.expectedFailures = []
+ self.unexpectedSuccesses = []
+ self.shouldStop = False
+ self.buffer = False
+ self.tb_locals = False
+ self._stdout_buffer = None
+ self._stderr_buffer = None
+ self._original_stdout = sys.stdout
+ self._original_stderr = sys.stderr
+ self._mirrorOutput = False
+
+ def printErrors(self):
+ "Called by TestRunner after test run"
+
+ def startTest(self, test):
+ "Called when the given test is about to be run"
+ self.testsRun += 1
+ self._mirrorOutput = False
+ self._setupStdout()
+
+ def _setupStdout(self):
+ if self.buffer:
+ if self._stderr_buffer is None:
+ self._stderr_buffer = io.StringIO()
+ self._stdout_buffer = io.StringIO()
+ sys.stdout = self._stdout_buffer
+ sys.stderr = self._stderr_buffer
+
+ def startTestRun(self):
+ """Called once before any tests are executed.
+
+ See startTest for a method called before each test.
+ """
+
+ def stopTest(self, test):
+ """Called when the given test has been run"""
+ self._restoreStdout()
+ self._mirrorOutput = False
+
+ def _restoreStdout(self):
+ if self.buffer:
+ if self._mirrorOutput:
+ output = sys.stdout.getvalue()
+ error = sys.stderr.getvalue()
+ if output:
+ if not output.endswith('\n'):
+ output += '\n'
+ self._original_stdout.write(STDOUT_LINE % output)
+ if error:
+ if not error.endswith('\n'):
+ error += '\n'
+ self._original_stderr.write(STDERR_LINE % error)
+
+ sys.stdout = self._original_stdout
+ sys.stderr = self._original_stderr
+ self._stdout_buffer.seek(0)
+ self._stdout_buffer.truncate()
+ self._stderr_buffer.seek(0)
+ self._stderr_buffer.truncate()
+
+ def stopTestRun(self):
+ """Called once after all tests are executed.
+
+ See stopTest for a method called after each test.
+ """
+
+ @failfast
+ def addError(self, test, err):
+ """Called when an error has occurred. 'err' is a tuple of values as
+ returned by sys.exc_info().
+ """
+ self.errors.append((test, self._exc_info_to_string(err, test)))
+ self._mirrorOutput = True
+
+ @failfast
+ def addFailure(self, test, err):
+ """Called when an error has occurred. 'err' is a tuple of values as
+ returned by sys.exc_info()."""
+ self.failures.append((test, self._exc_info_to_string(err, test)))
+ self._mirrorOutput = True
+
+ def addSubTest(self, test, subtest, err):
+ """Called at the end of a subtest.
+ 'err' is None if the subtest ended successfully, otherwise it's a
+ tuple of values as returned by sys.exc_info().
+ """
+ # By default, we don't do anything with successful subtests, but
+ # more sophisticated test results might want to record them.
+ if err is not None:
+ if getattr(self, 'failfast', False):
+ self.stop()
+ if issubclass(err[0], test.failureException):
+ errors = self.failures
+ else:
+ errors = self.errors
+ errors.append((subtest, self._exc_info_to_string(err, test)))
+ self._mirrorOutput = True
+
+ def addSuccess(self, test):
+ "Called when a test has completed successfully"
+ pass
+
+ def addSkip(self, test, reason):
+ """Called when a test is skipped."""
+ self.skipped.append((test, reason))
+
+ def addExpectedFailure(self, test, err):
+ """Called when an expected failure/error occurred."""
+ self.expectedFailures.append(
+ (test, self._exc_info_to_string(err, test)))
+
+ @failfast
+ def addUnexpectedSuccess(self, test):
+ """Called when a test was expected to fail, but succeed."""
+ self.unexpectedSuccesses.append(test)
+
+ def wasSuccessful(self):
+ """Tells whether or not this result was a success."""
+ # The hasattr check is for test_result's OldResult test. That
+ # way this method works on objects that lack the attribute.
+ # (where would such result instances come from? old stored pickles?)
+ return ((len(self.failures) == len(self.errors) == 0) and
+ (not hasattr(self, 'unexpectedSuccesses') or
+ len(self.unexpectedSuccesses) == 0))
+
+ def stop(self):
+ """Indicates that the tests should be aborted."""
+ self.shouldStop = True
+
+ def _exc_info_to_string(self, err, test):
+ """Converts a sys.exc_info()-style tuple of values into a string."""
+ exctype, value, tb = err
+ tb = self._clean_tracebacks(exctype, value, tb, test)
+ tb_e = traceback.TracebackException(
+ exctype, value, tb,
+ capture_locals=self.tb_locals, compact=True)
+ msgLines = list(tb_e.format())
+
+ if self.buffer:
+ output = sys.stdout.getvalue()
+ error = sys.stderr.getvalue()
+ if output:
+ if not output.endswith('\n'):
+ output += '\n'
+ msgLines.append(STDOUT_LINE % output)
+ if error:
+ if not error.endswith('\n'):
+ error += '\n'
+ msgLines.append(STDERR_LINE % error)
+ return ''.join(msgLines)
+
+ def _clean_tracebacks(self, exctype, value, tb, test):
+ ret = None
+ first = True
+ excs = [(exctype, value, tb)]
+ while excs:
+ (exctype, value, tb) = excs.pop()
+ # Skip test runner traceback levels
+ while tb and self._is_relevant_tb_level(tb):
+ tb = tb.tb_next
+
+ # Skip assert*() traceback levels
+ if exctype is test.failureException:
+ self._remove_unittest_tb_frames(tb)
+
+ if first:
+ ret = tb
+ first = False
+ else:
+ value.__traceback__ = tb
+
+ if value is not None:
+ for c in (value.__cause__, value.__context__):
+ if c is not None:
+ excs.append((type(c), c, c.__traceback__))
+ return ret
+
+ def _is_relevant_tb_level(self, tb):
+ return '__unittest' in tb.tb_frame.f_globals
+
+ def _remove_unittest_tb_frames(self, tb):
+ '''Truncates usercode tb at the first unittest frame.
+
+ If the first frame of the traceback is in user code,
+ the prefix up to the first unittest frame is returned.
+ If the first frame is already in the unittest module,
+ the traceback is not modified.
+ '''
+ prev = None
+ while tb and not self._is_relevant_tb_level(tb):
+ prev = tb
+ tb = tb.tb_next
+ if prev is not None:
+ prev.tb_next = None
+
+ def __repr__(self):
+ return ("<%s run=%i errors=%i failures=%i>" %
+ (util.strclass(self.__class__), self.testsRun, len(self.errors),
+ len(self.failures)))
diff --git a/lib/python3.10/unittest/runner.py b/lib/python3.10/unittest/runner.py
new file mode 100644
index 0000000..caf1590
--- /dev/null
+++ b/lib/python3.10/unittest/runner.py
@@ -0,0 +1,230 @@
+"""Running tests"""
+
+import sys
+import time
+import warnings
+
+from . import result
+from .signals import registerResult
+
+__unittest = True
+
+
+class _WritelnDecorator(object):
+ """Used to decorate file-like objects with a handy 'writeln' method"""
+ def __init__(self,stream):
+ self.stream = stream
+
+ def __getattr__(self, attr):
+ if attr in ('stream', '__getstate__'):
+ raise AttributeError(attr)
+ return getattr(self.stream,attr)
+
+ def writeln(self, arg=None):
+ if arg:
+ self.write(arg)
+ self.write('\n') # text-mode streams translate to \r\n if needed
+
+
+class TextTestResult(result.TestResult):
+ """A test result class that can print formatted text results to a stream.
+
+ Used by TextTestRunner.
+ """
+ separator1 = '=' * 70
+ separator2 = '-' * 70
+
+ def __init__(self, stream, descriptions, verbosity):
+ super(TextTestResult, self).__init__(stream, descriptions, verbosity)
+ self.stream = stream
+ self.showAll = verbosity > 1
+ self.dots = verbosity == 1
+ self.descriptions = descriptions
+
+ def getDescription(self, test):
+ doc_first_line = test.shortDescription()
+ if self.descriptions and doc_first_line:
+ return '\n'.join((str(test), doc_first_line))
+ else:
+ return str(test)
+
+ def startTest(self, test):
+ super(TextTestResult, self).startTest(test)
+ if self.showAll:
+ self.stream.write(self.getDescription(test))
+ self.stream.write(" ... ")
+ self.stream.flush()
+
+ def addSuccess(self, test):
+ super(TextTestResult, self).addSuccess(test)
+ if self.showAll:
+ self.stream.writeln("ok")
+ self.stream.flush()
+ elif self.dots:
+ self.stream.write('.')
+ self.stream.flush()
+
+ def addError(self, test, err):
+ super(TextTestResult, self).addError(test, err)
+ if self.showAll:
+ self.stream.writeln("ERROR")
+ self.stream.flush()
+ elif self.dots:
+ self.stream.write('E')
+ self.stream.flush()
+
+ def addFailure(self, test, err):
+ super(TextTestResult, self).addFailure(test, err)
+ if self.showAll:
+ self.stream.writeln("FAIL")
+ self.stream.flush()
+ elif self.dots:
+ self.stream.write('F')
+ self.stream.flush()
+
+ def addSkip(self, test, reason):
+ super(TextTestResult, self).addSkip(test, reason)
+ if self.showAll:
+ self.stream.writeln("skipped {0!r}".format(reason))
+ self.stream.flush()
+ elif self.dots:
+ self.stream.write("s")
+ self.stream.flush()
+
+ def addExpectedFailure(self, test, err):
+ super(TextTestResult, self).addExpectedFailure(test, err)
+ if self.showAll:
+ self.stream.writeln("expected failure")
+ self.stream.flush()
+ elif self.dots:
+ self.stream.write("x")
+ self.stream.flush()
+
+ def addUnexpectedSuccess(self, test):
+ super(TextTestResult, self).addUnexpectedSuccess(test)
+ if self.showAll:
+ self.stream.writeln("unexpected success")
+ self.stream.flush()
+ elif self.dots:
+ self.stream.write("u")
+ self.stream.flush()
+
+ def printErrors(self):
+ if self.dots or self.showAll:
+ self.stream.writeln()
+ self.stream.flush()
+ self.printErrorList('ERROR', self.errors)
+ self.printErrorList('FAIL', self.failures)
+
+ def printErrorList(self, flavour, errors):
+ for test, err in errors:
+ self.stream.writeln(self.separator1)
+ self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
+ self.stream.writeln(self.separator2)
+ self.stream.writeln("%s" % err)
+ self.stream.flush()
+
+
+class TextTestRunner(object):
+ """A test runner class that displays results in textual form.
+
+ It prints out the names of tests as they are run, errors as they
+ occur, and a summary of the results at the end of the test run.
+ """
+ resultclass = TextTestResult
+
+ def __init__(self, stream=None, descriptions=True, verbosity=1,
+ failfast=False, buffer=False, resultclass=None, warnings=None,
+ *, tb_locals=False):
+ """Construct a TextTestRunner.
+
+ Subclasses should accept **kwargs to ensure compatibility as the
+ interface changes.
+ """
+ if stream is None:
+ stream = sys.stderr
+ self.stream = _WritelnDecorator(stream)
+ self.descriptions = descriptions
+ self.verbosity = verbosity
+ self.failfast = failfast
+ self.buffer = buffer
+ self.tb_locals = tb_locals
+ self.warnings = warnings
+ if resultclass is not None:
+ self.resultclass = resultclass
+
+ def _makeResult(self):
+ return self.resultclass(self.stream, self.descriptions, self.verbosity)
+
+ def run(self, test):
+ "Run the given test case or test suite."
+ result = self._makeResult()
+ registerResult(result)
+ result.failfast = self.failfast
+ result.buffer = self.buffer
+ result.tb_locals = self.tb_locals
+ with warnings.catch_warnings():
+ if self.warnings:
+ # if self.warnings is set, use it to filter all the warnings
+ warnings.simplefilter(self.warnings)
+ # if the filter is 'default' or 'always', special-case the
+ # warnings from the deprecated unittest methods to show them
+ # no more than once per module, because they can be fairly
+ # noisy. The -Wd and -Wa flags can be used to bypass this
+ # only when self.warnings is None.
+ if self.warnings in ['default', 'always']:
+ warnings.filterwarnings('module',
+ category=DeprecationWarning,
+ message=r'Please use assert\w+ instead.')
+ startTime = time.perf_counter()
+ startTestRun = getattr(result, 'startTestRun', None)
+ if startTestRun is not None:
+ startTestRun()
+ try:
+ test(result)
+ finally:
+ stopTestRun = getattr(result, 'stopTestRun', None)
+ if stopTestRun is not None:
+ stopTestRun()
+ stopTime = time.perf_counter()
+ timeTaken = stopTime - startTime
+ result.printErrors()
+ if hasattr(result, 'separator2'):
+ self.stream.writeln(result.separator2)
+ run = result.testsRun
+ self.stream.writeln("Ran %d test%s in %.3fs" %
+ (run, run != 1 and "s" or "", timeTaken))
+ self.stream.writeln()
+
+ expectedFails = unexpectedSuccesses = skipped = 0
+ try:
+ results = map(len, (result.expectedFailures,
+ result.unexpectedSuccesses,
+ result.skipped))
+ except AttributeError:
+ pass
+ else:
+ expectedFails, unexpectedSuccesses, skipped = results
+
+ infos = []
+ if not result.wasSuccessful():
+ self.stream.write("FAILED")
+ failed, errored = len(result.failures), len(result.errors)
+ if failed:
+ infos.append("failures=%d" % failed)
+ if errored:
+ infos.append("errors=%d" % errored)
+ else:
+ self.stream.write("OK")
+ if skipped:
+ infos.append("skipped=%d" % skipped)
+ if expectedFails:
+ infos.append("expected failures=%d" % expectedFails)
+ if unexpectedSuccesses:
+ infos.append("unexpected successes=%d" % unexpectedSuccesses)
+ if infos:
+ self.stream.writeln(" (%s)" % (", ".join(infos),))
+ else:
+ self.stream.write("\n")
+ self.stream.flush()
+ return result
diff --git a/lib/python3.10/unittest/signals.py b/lib/python3.10/unittest/signals.py
new file mode 100644
index 0000000..e6a5fc5
--- /dev/null
+++ b/lib/python3.10/unittest/signals.py
@@ -0,0 +1,71 @@
+import signal
+import weakref
+
+from functools import wraps
+
+__unittest = True
+
+
+class _InterruptHandler(object):
+ def __init__(self, default_handler):
+ self.called = False
+ self.original_handler = default_handler
+ if isinstance(default_handler, int):
+ if default_handler == signal.SIG_DFL:
+ # Pretend it's signal.default_int_handler instead.
+ default_handler = signal.default_int_handler
+ elif default_handler == signal.SIG_IGN:
+ # Not quite the same thing as SIG_IGN, but the closest we
+ # can make it: do nothing.
+ def default_handler(unused_signum, unused_frame):
+ pass
+ else:
+ raise TypeError("expected SIGINT signal handler to be "
+ "signal.SIG_IGN, signal.SIG_DFL, or a "
+ "callable object")
+ self.default_handler = default_handler
+
+ def __call__(self, signum, frame):
+ installed_handler = signal.getsignal(signal.SIGINT)
+ if installed_handler is not self:
+ # if we aren't the installed handler, then delegate immediately
+ # to the default handler
+ self.default_handler(signum, frame)
+
+ if self.called:
+ self.default_handler(signum, frame)
+ self.called = True
+ for result in _results.keys():
+ result.stop()
+
+_results = weakref.WeakKeyDictionary()
+def registerResult(result):
+ _results[result] = 1
+
+def removeResult(result):
+ return bool(_results.pop(result, None))
+
+_interrupt_handler = None
+def installHandler():
+ global _interrupt_handler
+ if _interrupt_handler is None:
+ default_handler = signal.getsignal(signal.SIGINT)
+ _interrupt_handler = _InterruptHandler(default_handler)
+ signal.signal(signal.SIGINT, _interrupt_handler)
+
+
+def removeHandler(method=None):
+ if method is not None:
+ @wraps(method)
+ def inner(*args, **kwargs):
+ initial = signal.getsignal(signal.SIGINT)
+ removeHandler()
+ try:
+ return method(*args, **kwargs)
+ finally:
+ signal.signal(signal.SIGINT, initial)
+ return inner
+
+ global _interrupt_handler
+ if _interrupt_handler is not None:
+ signal.signal(signal.SIGINT, _interrupt_handler.original_handler)
diff --git a/lib/python3.10/unittest/suite.py b/lib/python3.10/unittest/suite.py
new file mode 100644
index 0000000..6f45b6f
--- /dev/null
+++ b/lib/python3.10/unittest/suite.py
@@ -0,0 +1,379 @@
+"""TestSuite"""
+
+import sys
+
+from . import case
+from . import util
+
+__unittest = True
+
+
+def _call_if_exists(parent, attr):
+ func = getattr(parent, attr, lambda: None)
+ func()
+
+
+class BaseTestSuite(object):
+ """A simple test suite that doesn't provide class or module shared fixtures.
+ """
+ _cleanup = True
+
+ def __init__(self, tests=()):
+ self._tests = []
+ self._removed_tests = 0
+ self.addTests(tests)
+
+ def __repr__(self):
+ return "<%s tests=%s>" % (util.strclass(self.__class__), list(self))
+
+ def __eq__(self, other):
+ if not isinstance(other, self.__class__):
+ return NotImplemented
+ return list(self) == list(other)
+
+ def __iter__(self):
+ return iter(self._tests)
+
+ def countTestCases(self):
+ cases = self._removed_tests
+ for test in self:
+ if test:
+ cases += test.countTestCases()
+ return cases
+
+ def addTest(self, test):
+ # sanity checks
+ if not callable(test):
+ raise TypeError("{} is not callable".format(repr(test)))
+ if isinstance(test, type) and issubclass(test,
+ (case.TestCase, TestSuite)):
+ raise TypeError("TestCases and TestSuites must be instantiated "
+ "before passing them to addTest()")
+ self._tests.append(test)
+
+ def addTests(self, tests):
+ if isinstance(tests, str):
+ raise TypeError("tests must be an iterable of tests, not a string")
+ for test in tests:
+ self.addTest(test)
+
+ def run(self, result):
+ for index, test in enumerate(self):
+ if result.shouldStop:
+ break
+ test(result)
+ if self._cleanup:
+ self._removeTestAtIndex(index)
+ return result
+
+ def _removeTestAtIndex(self, index):
+ """Stop holding a reference to the TestCase at index."""
+ try:
+ test = self._tests[index]
+ except TypeError:
+ # support for suite implementations that have overridden self._tests
+ pass
+ else:
+ # Some unittest tests add non TestCase/TestSuite objects to
+ # the suite.
+ if hasattr(test, 'countTestCases'):
+ self._removed_tests += test.countTestCases()
+ self._tests[index] = None
+
+ def __call__(self, *args, **kwds):
+ return self.run(*args, **kwds)
+
+ def debug(self):
+ """Run the tests without collecting errors in a TestResult"""
+ for test in self:
+ test.debug()
+
+
+class TestSuite(BaseTestSuite):
+ """A test suite is a composite test consisting of a number of TestCases.
+
+ For use, create an instance of TestSuite, then add test case instances.
+ When all tests have been added, the suite can be passed to a test
+ runner, such as TextTestRunner. It will run the individual test cases
+ in the order in which they were added, aggregating the results. When
+ subclassing, do not forget to call the base class constructor.
+ """
+
+ def run(self, result, debug=False):
+ topLevel = False
+ if getattr(result, '_testRunEntered', False) is False:
+ result._testRunEntered = topLevel = True
+
+ for index, test in enumerate(self):
+ if result.shouldStop:
+ break
+
+ if _isnotsuite(test):
+ self._tearDownPreviousClass(test, result)
+ self._handleModuleFixture(test, result)
+ self._handleClassSetUp(test, result)
+ result._previousTestClass = test.__class__
+
+ if (getattr(test.__class__, '_classSetupFailed', False) or
+ getattr(result, '_moduleSetUpFailed', False)):
+ continue
+
+ if not debug:
+ test(result)
+ else:
+ test.debug()
+
+ if self._cleanup:
+ self._removeTestAtIndex(index)
+
+ if topLevel:
+ self._tearDownPreviousClass(None, result)
+ self._handleModuleTearDown(result)
+ result._testRunEntered = False
+ return result
+
+ def debug(self):
+ """Run the tests without collecting errors in a TestResult"""
+ debug = _DebugResult()
+ self.run(debug, True)
+
+ ################################
+
+ def _handleClassSetUp(self, test, result):
+ previousClass = getattr(result, '_previousTestClass', None)
+ currentClass = test.__class__
+ if currentClass == previousClass:
+ return
+ if result._moduleSetUpFailed:
+ return
+ if getattr(currentClass, "__unittest_skip__", False):
+ return
+
+ failed = False
+ try:
+ currentClass._classSetupFailed = False
+ except TypeError:
+ # test may actually be a function
+ # so its class will be a builtin-type
+ pass
+
+ setUpClass = getattr(currentClass, 'setUpClass', None)
+ doClassCleanups = getattr(currentClass, 'doClassCleanups', None)
+ if setUpClass is not None:
+ _call_if_exists(result, '_setupStdout')
+ try:
+ try:
+ setUpClass()
+ except Exception as e:
+ if isinstance(result, _DebugResult):
+ raise
+ failed = True
+ try:
+ currentClass._classSetupFailed = True
+ except TypeError:
+ pass
+ className = util.strclass(currentClass)
+ self._createClassOrModuleLevelException(result, e,
+ 'setUpClass',
+ className)
+ if failed and doClassCleanups is not None:
+ doClassCleanups()
+ for exc_info in currentClass.tearDown_exceptions:
+ self._createClassOrModuleLevelException(
+ result, exc_info[1], 'setUpClass', className,
+ info=exc_info)
+ finally:
+ _call_if_exists(result, '_restoreStdout')
+
+ def _get_previous_module(self, result):
+ previousModule = None
+ previousClass = getattr(result, '_previousTestClass', None)
+ if previousClass is not None:
+ previousModule = previousClass.__module__
+ return previousModule
+
+
+ def _handleModuleFixture(self, test, result):
+ previousModule = self._get_previous_module(result)
+ currentModule = test.__class__.__module__
+ if currentModule == previousModule:
+ return
+
+ self._handleModuleTearDown(result)
+
+
+ result._moduleSetUpFailed = False
+ try:
+ module = sys.modules[currentModule]
+ except KeyError:
+ return
+ setUpModule = getattr(module, 'setUpModule', None)
+ if setUpModule is not None:
+ _call_if_exists(result, '_setupStdout')
+ try:
+ try:
+ setUpModule()
+ except Exception as e:
+ if isinstance(result, _DebugResult):
+ raise
+ result._moduleSetUpFailed = True
+ self._createClassOrModuleLevelException(result, e,
+ 'setUpModule',
+ currentModule)
+ if result._moduleSetUpFailed:
+ try:
+ case.doModuleCleanups()
+ except Exception as e:
+ self._createClassOrModuleLevelException(result, e,
+ 'setUpModule',
+ currentModule)
+ finally:
+ _call_if_exists(result, '_restoreStdout')
+
+ def _createClassOrModuleLevelException(self, result, exc, method_name,
+ parent, info=None):
+ errorName = f'{method_name} ({parent})'
+ self._addClassOrModuleLevelException(result, exc, errorName, info)
+
+ def _addClassOrModuleLevelException(self, result, exception, errorName,
+ info=None):
+ error = _ErrorHolder(errorName)
+ addSkip = getattr(result, 'addSkip', None)
+ if addSkip is not None and isinstance(exception, case.SkipTest):
+ addSkip(error, str(exception))
+ else:
+ if not info:
+ result.addError(error, sys.exc_info())
+ else:
+ result.addError(error, info)
+
+ def _handleModuleTearDown(self, result):
+ previousModule = self._get_previous_module(result)
+ if previousModule is None:
+ return
+ if result._moduleSetUpFailed:
+ return
+
+ try:
+ module = sys.modules[previousModule]
+ except KeyError:
+ return
+
+ _call_if_exists(result, '_setupStdout')
+ try:
+ tearDownModule = getattr(module, 'tearDownModule', None)
+ if tearDownModule is not None:
+ try:
+ tearDownModule()
+ except Exception as e:
+ if isinstance(result, _DebugResult):
+ raise
+ self._createClassOrModuleLevelException(result, e,
+ 'tearDownModule',
+ previousModule)
+ try:
+ case.doModuleCleanups()
+ except Exception as e:
+ if isinstance(result, _DebugResult):
+ raise
+ self._createClassOrModuleLevelException(result, e,
+ 'tearDownModule',
+ previousModule)
+ finally:
+ _call_if_exists(result, '_restoreStdout')
+
+ def _tearDownPreviousClass(self, test, result):
+ previousClass = getattr(result, '_previousTestClass', None)
+ currentClass = test.__class__
+ if currentClass == previousClass or previousClass is None:
+ return
+ if getattr(previousClass, '_classSetupFailed', False):
+ return
+ if getattr(result, '_moduleSetUpFailed', False):
+ return
+ if getattr(previousClass, "__unittest_skip__", False):
+ return
+
+ tearDownClass = getattr(previousClass, 'tearDownClass', None)
+ doClassCleanups = getattr(previousClass, 'doClassCleanups', None)
+ if tearDownClass is None and doClassCleanups is None:
+ return
+
+ _call_if_exists(result, '_setupStdout')
+ try:
+ if tearDownClass is not None:
+ try:
+ tearDownClass()
+ except Exception as e:
+ if isinstance(result, _DebugResult):
+ raise
+ className = util.strclass(previousClass)
+ self._createClassOrModuleLevelException(result, e,
+ 'tearDownClass',
+ className)
+ if doClassCleanups is not None:
+ doClassCleanups()
+ for exc_info in previousClass.tearDown_exceptions:
+ if isinstance(result, _DebugResult):
+ raise exc_info[1]
+ className = util.strclass(previousClass)
+ self._createClassOrModuleLevelException(result, exc_info[1],
+ 'tearDownClass',
+ className,
+ info=exc_info)
+ finally:
+ _call_if_exists(result, '_restoreStdout')
+
+
+class _ErrorHolder(object):
+ """
+ Placeholder for a TestCase inside a result. As far as a TestResult
+ is concerned, this looks exactly like a unit test. Used to insert
+ arbitrary errors into a test suite run.
+ """
+ # Inspired by the ErrorHolder from Twisted:
+ # http://twistedmatrix.com/trac/browser/trunk/twisted/trial/runner.py
+
+ # attribute used by TestResult._exc_info_to_string
+ failureException = None
+
+ def __init__(self, description):
+ self.description = description
+
+ def id(self):
+ return self.description
+
+ def shortDescription(self):
+ return None
+
+ def __repr__(self):
+ return "<ErrorHolder description=%r>" % (self.description,)
+
+ def __str__(self):
+ return self.id()
+
+ def run(self, result):
+ # could call result.addError(...) - but this test-like object
+ # shouldn't be run anyway
+ pass
+
+ def __call__(self, result):
+ return self.run(result)
+
+ def countTestCases(self):
+ return 0
+
+def _isnotsuite(test):
+ "A crude way to tell apart testcases and suites with duck-typing"
+ try:
+ iter(test)
+ except TypeError:
+ return True
+ return False
+
+
+class _DebugResult(object):
+ "Used by the TestSuite to hold previous class when running in debug."
+ _previousTestClass = None
+ _moduleSetUpFailed = False
+ shouldStop = False
diff --git a/lib/python3.10/unittest/util.py b/lib/python3.10/unittest/util.py
new file mode 100644
index 0000000..050eaed
--- /dev/null
+++ b/lib/python3.10/unittest/util.py
@@ -0,0 +1,170 @@
+"""Various utility functions."""
+
+from collections import namedtuple, Counter
+from os.path import commonprefix
+
+__unittest = True
+
+_MAX_LENGTH = 80
+_PLACEHOLDER_LEN = 12
+_MIN_BEGIN_LEN = 5
+_MIN_END_LEN = 5
+_MIN_COMMON_LEN = 5
+_MIN_DIFF_LEN = _MAX_LENGTH - \
+ (_MIN_BEGIN_LEN + _PLACEHOLDER_LEN + _MIN_COMMON_LEN +
+ _PLACEHOLDER_LEN + _MIN_END_LEN)
+assert _MIN_DIFF_LEN >= 0
+
+def _shorten(s, prefixlen, suffixlen):
+ skip = len(s) - prefixlen - suffixlen
+ if skip > _PLACEHOLDER_LEN:
+ s = '%s[%d chars]%s' % (s[:prefixlen], skip, s[len(s) - suffixlen:])
+ return s
+
+def _common_shorten_repr(*args):
+ args = tuple(map(safe_repr, args))
+ maxlen = max(map(len, args))
+ if maxlen <= _MAX_LENGTH:
+ return args
+
+ prefix = commonprefix(args)
+ prefixlen = len(prefix)
+
+ common_len = _MAX_LENGTH - \
+ (maxlen - prefixlen + _MIN_BEGIN_LEN + _PLACEHOLDER_LEN)
+ if common_len > _MIN_COMMON_LEN:
+ assert _MIN_BEGIN_LEN + _PLACEHOLDER_LEN + _MIN_COMMON_LEN + \
+ (maxlen - prefixlen) < _MAX_LENGTH
+ prefix = _shorten(prefix, _MIN_BEGIN_LEN, common_len)
+ return tuple(prefix + s[prefixlen:] for s in args)
+
+ prefix = _shorten(prefix, _MIN_BEGIN_LEN, _MIN_COMMON_LEN)
+ return tuple(prefix + _shorten(s[prefixlen:], _MIN_DIFF_LEN, _MIN_END_LEN)
+ for s in args)
+
+def safe_repr(obj, short=False):
+ try:
+ result = repr(obj)
+ except Exception:
+ result = object.__repr__(obj)
+ if not short or len(result) < _MAX_LENGTH:
+ return result
+ return result[:_MAX_LENGTH] + ' [truncated]...'
+
+def strclass(cls):
+ return "%s.%s" % (cls.__module__, cls.__qualname__)
+
+def sorted_list_difference(expected, actual):
+ """Finds elements in only one or the other of two, sorted input lists.
+
+ Returns a two-element tuple of lists. The first list contains those
+ elements in the "expected" list but not in the "actual" list, and the
+ second contains those elements in the "actual" list but not in the
+ "expected" list. Duplicate elements in either input list are ignored.
+ """
+ i = j = 0
+ missing = []
+ unexpected = []
+ while True:
+ try:
+ e = expected[i]
+ a = actual[j]
+ if e < a:
+ missing.append(e)
+ i += 1
+ while expected[i] == e:
+ i += 1
+ elif e > a:
+ unexpected.append(a)
+ j += 1
+ while actual[j] == a:
+ j += 1
+ else:
+ i += 1
+ try:
+ while expected[i] == e:
+ i += 1
+ finally:
+ j += 1
+ while actual[j] == a:
+ j += 1
+ except IndexError:
+ missing.extend(expected[i:])
+ unexpected.extend(actual[j:])
+ break
+ return missing, unexpected
+
+
+def unorderable_list_difference(expected, actual):
+ """Same behavior as sorted_list_difference but
+ for lists of unorderable items (like dicts).
+
+ As it does a linear search per item (remove) it
+ has O(n*n) performance."""
+ missing = []
+ while expected:
+ item = expected.pop()
+ try:
+ actual.remove(item)
+ except ValueError:
+ missing.append(item)
+
+ # anything left in actual is unexpected
+ return missing, actual
+
+def three_way_cmp(x, y):
+ """Return -1 if x < y, 0 if x == y and 1 if x > y"""
+ return (x > y) - (x < y)
+
+_Mismatch = namedtuple('Mismatch', 'actual expected value')
+
+def _count_diff_all_purpose(actual, expected):
+ 'Returns list of (cnt_act, cnt_exp, elem) triples where the counts differ'
+ # elements need not be hashable
+ s, t = list(actual), list(expected)
+ m, n = len(s), len(t)
+ NULL = object()
+ result = []
+ for i, elem in enumerate(s):
+ if elem is NULL:
+ continue
+ cnt_s = cnt_t = 0
+ for j in range(i, m):
+ if s[j] == elem:
+ cnt_s += 1
+ s[j] = NULL
+ for j, other_elem in enumerate(t):
+ if other_elem == elem:
+ cnt_t += 1
+ t[j] = NULL
+ if cnt_s != cnt_t:
+ diff = _Mismatch(cnt_s, cnt_t, elem)
+ result.append(diff)
+
+ for i, elem in enumerate(t):
+ if elem is NULL:
+ continue
+ cnt_t = 0
+ for j in range(i, n):
+ if t[j] == elem:
+ cnt_t += 1
+ t[j] = NULL
+ diff = _Mismatch(0, cnt_t, elem)
+ result.append(diff)
+ return result
+
+def _count_diff_hashable(actual, expected):
+ 'Returns list of (cnt_act, cnt_exp, elem) triples where the counts differ'
+ # elements must be hashable
+ s, t = Counter(actual), Counter(expected)
+ result = []
+ for elem, cnt_s in s.items():
+ cnt_t = t.get(elem, 0)
+ if cnt_s != cnt_t:
+ diff = _Mismatch(cnt_s, cnt_t, elem)
+ result.append(diff)
+ for elem, cnt_t in t.items():
+ if elem not in s:
+ diff = _Mismatch(0, cnt_t, elem)
+ result.append(diff)
+ return result