diff options
Diffstat (limited to 'pyfakefs/fake_filesystem_unittest.py')
-rw-r--r-- | pyfakefs/fake_filesystem_unittest.py | 670 |
1 files changed, 432 insertions, 238 deletions
diff --git a/pyfakefs/fake_filesystem_unittest.py b/pyfakefs/fake_filesystem_unittest.py index dbb2e34..6633cb5 100644 --- a/pyfakefs/fake_filesystem_unittest.py +++ b/pyfakefs/fake_filesystem_unittest.py @@ -38,98 +38,114 @@ to `:py:class`pyfakefs.fake_filesystem_unittest.TestCase`. import doctest import functools import inspect +import linecache import shutil import sys import tempfile +import tokenize +from importlib.abc import Loader, MetaPathFinder +from types import ModuleType, TracebackType, FunctionType +from typing import ( + Any, Callable, Dict, List, Set, Tuple, Optional, Union, + AnyStr, Type, Iterator, cast, ItemsView, Sequence +) import unittest import warnings +from unittest import TestSuite from pyfakefs.deprecator import Deprecator -from pyfakefs.fake_filesystem import set_uid, set_gid, reset_ids +from pyfakefs.fake_filesystem import ( + set_uid, set_gid, reset_ids, PatchMode, FakeFile, FakeFilesystem +) from pyfakefs.helpers import IS_PYPY +from pyfakefs.mox3_stubout import StubOutForTesting try: from importlib.machinery import ModuleSpec except ImportError: - ModuleSpec = object + ModuleSpec = object # type: ignore[assignment, misc] from importlib import reload from pyfakefs import fake_filesystem from pyfakefs import fake_filesystem_shutil +from pyfakefs import fake_pathlib from pyfakefs import mox3_stubout -from pyfakefs.extra_packages import pathlib, pathlib2, use_scandir - -if pathlib: - from pyfakefs import fake_pathlib +from pyfakefs.extra_packages import pathlib2, use_scandir if use_scandir: from pyfakefs import fake_scandir OS_MODULE = 'nt' if sys.platform == 'win32' else 'posix' PATH_MODULE = 'ntpath' if sys.platform == 'win32' else 'posixpath' -BUILTIN_MODULE = '__builtin__' - - -def _patchfs(f): - """Internally used to be able to use patchfs without parentheses.""" - - @functools.wraps(f) - def decorated(*args, **kwargs): - with Patcher() as p: - kwargs['fs'] = p.fs - return f(*args, **kwargs) - - return decorated -def patchfs(additional_skip_names=None, - modules_to_reload=None, - modules_to_patch=None, - allow_root_user=True): +def patchfs(_func: Callable = None, *, + additional_skip_names: Optional[ + List[Union[str, ModuleType]]] = None, + modules_to_reload: Optional[List[ModuleType]] = None, + modules_to_patch: Optional[Dict[str, ModuleType]] = None, + allow_root_user: bool = True, + use_known_patches: bool = True, + patch_open_code: PatchMode = PatchMode.OFF, + patch_default_args: bool = False, + use_cache: bool = True) -> Callable: """Convenience decorator to use patcher with additional parameters in a test function. Usage:: @patchfs - test_my_function(fs): - fs.create_file('foo') + def test_my_function(fake_fs): + fake_fs.create_file('foo') @patchfs(allow_root_user=False) - test_with_patcher_args(fs): + def test_with_patcher_args(fs): os.makedirs('foo/bar') """ - def wrap_patchfs(f): + def wrap_patchfs(f: Callable) -> Callable: @functools.wraps(f) def wrapped(*args, **kwargs): with Patcher( additional_skip_names=additional_skip_names, modules_to_reload=modules_to_reload, modules_to_patch=modules_to_patch, - allow_root_user=allow_root_user) as p: - kwargs['fs'] = p.fs + allow_root_user=allow_root_user, + use_known_patches=use_known_patches, + patch_open_code=patch_open_code, + patch_default_args=patch_default_args, + use_cache=use_cache) as p: + args = list(args) + args.append(p.fs) return f(*args, **kwargs) return wrapped - # workaround to be able to use the decorator without using calling syntax - # (the default usage without parameters) - # if using the decorator without parentheses, the first argument here - # will be the wrapped function, so we pass it to the decorator function - # that doesn't use arguments - if inspect.isfunction(additional_skip_names): - return _patchfs(additional_skip_names) + if _func: + if not callable(_func): + raise TypeError( + "Decorator argument is not a function.\n" + "Did you mean `@patchfs(additional_skip_names=...)`?" + ) + if hasattr(_func, 'patchings'): + _func.nr_patches = len(_func.patchings) # type: ignore + return wrap_patchfs(_func) return wrap_patchfs -def load_doctests(loader, tests, ignore, module, - additional_skip_names=None, - modules_to_reload=None, - modules_to_patch=None, - allow_root_user=True): # pylint: disable=unused-argument +def load_doctests( + loader: Any, tests: TestSuite, ignore: Any, module: ModuleType, + additional_skip_names: Optional[ + List[Union[str, ModuleType]]] = None, + modules_to_reload: Optional[List[ModuleType]] = None, + modules_to_patch: Optional[Dict[str, ModuleType]] = None, + allow_root_user: bool = True, + use_known_patches: bool = True, + patch_open_code: PatchMode = PatchMode.OFF, + patch_default_args: bool = False +) -> TestSuite: # pylint:disable=unused-argument """Load the doctest tests for the specified module into unittest. Args: loader, tests, ignore : arguments passed in from `load_tests()` @@ -141,7 +157,10 @@ def load_doctests(loader, tests, ignore, module, _patcher = Patcher(additional_skip_names=additional_skip_names, modules_to_reload=modules_to_reload, modules_to_patch=modules_to_patch, - allow_root_user=allow_root_user) + allow_root_user=allow_root_user, + use_known_patches=use_known_patches, + patch_open_code=patch_open_code, + patch_default_args=patch_default_args) globs = _patcher.replace_globs(vars(module)) tests.addTests(doctest.DocTestSuite(module, globs=globs, @@ -190,19 +209,24 @@ class TestCaseMixin: methodName=methodName, modules_to_reload=[sut]) """ - additional_skip_names = None - modules_to_reload = None - modules_to_patch = None + additional_skip_names: Optional[List[Union[str, ModuleType]]] = None + modules_to_reload: Optional[List[ModuleType]] = None + modules_to_patch: Optional[Dict[str, ModuleType]] = None @property - def fs(self): - return self._stubber.fs + def fs(self) -> FakeFilesystem: + return cast(FakeFilesystem, self._stubber.fs) def setUpPyfakefs(self, - additional_skip_names=None, - modules_to_reload=None, - modules_to_patch=None, - allow_root_user=True): + additional_skip_names: Optional[ + List[Union[str, ModuleType]]] = None, + modules_to_reload: Optional[List[ModuleType]] = None, + modules_to_patch: Optional[Dict[str, ModuleType]] = None, + allow_root_user: bool = True, + use_known_patches: bool = True, + patch_open_code: PatchMode = PatchMode.OFF, + patch_default_args: bool = False, + use_cache: bool = True) -> None: """Bind the file-related modules to the :py:class:`pyfakefs` fake file system instead of the real file system. Also bind the fake `open()` function. @@ -224,13 +248,17 @@ class TestCaseMixin: additional_skip_names=additional_skip_names, modules_to_reload=modules_to_reload, modules_to_patch=modules_to_patch, - allow_root_user=allow_root_user + allow_root_user=allow_root_user, + use_known_patches=use_known_patches, + patch_open_code=patch_open_code, + patch_default_args=patch_default_args, + use_cache=use_cache ) self._stubber.setUp() - self.addCleanup(self._stubber.tearDown) + cast(TestCase, self).addCleanup(self._stubber.tearDown) - def pause(self): + def pause(self) -> None: """Pause the patching of the file system modules until `resume` is called. After that call, all file system calls are executed in the real file system. @@ -239,7 +267,7 @@ class TestCaseMixin: """ self._stubber.pause() - def resume(self): + def resume(self) -> None: """Resume the patching of the file system modules if `pause` has been called before. After that call, all file system calls are executed in the fake file system. @@ -255,11 +283,11 @@ class TestCase(unittest.TestCase, TestCaseMixin): The arguments are explained in :py:class:`TestCaseMixin`. """ - def __init__(self, methodName='runTest', - additional_skip_names=None, - modules_to_reload=None, - modules_to_patch=None, - allow_root_user=True): + def __init__(self, methodName: str = 'runTest', + additional_skip_names: Optional[ + List[Union[str, ModuleType]]] = None, + modules_to_reload: Optional[List[ModuleType]] = None, + modules_to_patch: Optional[Dict[str, ModuleType]] = None): """Creates the test class instance and the patcher used to stub out file system related modules. @@ -267,16 +295,16 @@ class TestCase(unittest.TestCase, TestCaseMixin): methodName: The name of the test method (same as in unittest.TestCase) """ - super(TestCase, self).__init__(methodName) + super().__init__(methodName) self.additional_skip_names = additional_skip_names self.modules_to_reload = modules_to_reload self.modules_to_patch = modules_to_patch - self.allow_root_user = allow_root_user @Deprecator('add_real_file') - def copyRealFile(self, real_file_path, fake_file_path=None, - create_missing_dirs=True): + def copyRealFile(self, real_file_path: AnyStr, + fake_file_path: Optional[AnyStr] = None, + create_missing_dirs: bool = True) -> FakeFile: """Add the file `real_file_path` in the real file system to the same path in the fake file system. @@ -312,10 +340,10 @@ class TestCase(unittest.TestCase, TestCaseMixin): if not create_missing_dirs: raise ValueError("CopyRealFile() is deprecated and no longer " "supports NOT creating missing directories") + assert self._stubber.fs is not None return self._stubber.fs.add_real_file(real_file_path, read_only=False) - @DeprecationWarning - def tearDownPyfakefs(self): + def tearDownPyfakefs(self) -> None: """This method is deprecated and exists only for backward compatibility. It does nothing. """ @@ -338,67 +366,165 @@ class Patcher: '''Stub nothing that is imported within these modules. `sys` is included to prevent `sys.path` from being stubbed with the fake `os.path`. + The `pytest` and `py` modules are used by pytest and have to access the + real file system. + The `linecache` module is used to read the test file in case of test + failure to get traceback information before test tear down. + In order to make sure that reading the test file is not faked, + we skip faking the module. + We also have to set back the cached open function in tokenize. ''' - SKIPMODULES = {None, fake_filesystem, fake_filesystem_shutil, sys} + SKIPMODULES = { + None, fake_filesystem, fake_filesystem_shutil, + sys, linecache, tokenize + } + # caches all modules that do not have file system modules or function + # to speed up _find_modules + CACHED_MODULES: Set[ModuleType] = set() + FS_MODULES: Dict[str, Set[Tuple[ModuleType, str]]] = {} + FS_FUNCTIONS: Dict[Tuple[str, str, str], Set[ModuleType]] = {} + FS_DEFARGS: List[Tuple[FunctionType, int, Callable[..., Any]]] = [] + SKIPPED_FS_MODULES: Dict[str, Set[Tuple[ModuleType, str]]] = {} + assert None in SKIPMODULES, ("sys.modules contains 'None' values;" " must skip them.") IS_WINDOWS = sys.platform in ('win32', 'cygwin') - SKIPNAMES = {'os', 'path', 'io', 'genericpath', OS_MODULE, PATH_MODULE} - if pathlib: - SKIPNAMES.add('pathlib') - if pathlib2: - SKIPNAMES.add('pathlib2') - - def __init__(self, additional_skip_names=None, - modules_to_reload=None, modules_to_patch=None, - allow_root_user=True): - """For a description of the arguments, see TestCase.__init__""" + SKIPNAMES = {'os', 'path', 'io', 'genericpath', 'fcntl', + OS_MODULE, PATH_MODULE} + + # hold values from last call - if changed, the cache has to be invalidated + PATCHED_MODULE_NAMES: Set[str] = set() + ADDITIONAL_SKIP_NAMES: Set[str] = set() + PATCH_DEFAULT_ARGS = False + + def __init__(self, additional_skip_names: Optional[ + List[Union[str, ModuleType]]] = None, + modules_to_reload: Optional[List[ModuleType]] = None, + modules_to_patch: Optional[Dict[str, ModuleType]] = None, + allow_root_user: bool = True, + use_known_patches: bool = True, + patch_open_code: PatchMode = PatchMode.OFF, + patch_default_args: bool = False, + use_cache: bool = True) -> None: + """ + Args: + additional_skip_names: names of modules inside of which no module + replacement shall be performed, in addition to the names in + :py:attr:`fake_filesystem_unittest.Patcher.SKIPNAMES`. + Instead of the module names, the modules themselves + may be used. + modules_to_reload: A list of modules that need to be reloaded + to be patched dynamically; may be needed if the module + imports file system modules under an alias + + .. caution:: Reloading modules may have unwanted side effects. + modules_to_patch: A dictionary of fake modules mapped to the + fully qualified patched module names. Can be used to add + patching of modules not provided by `pyfakefs`. + allow_root_user: If True (default), if the test is run as root + user, the user in the fake file system is also considered a + root user, otherwise it is always considered a regular user. + use_known_patches: If True (the default), some patches for commonly + used packages are applied which make them usable with pyfakefs. + patch_open_code: If True, `io.open_code` is patched. The default + is not to patch it, as it mostly is used to load compiled + modules that are not in the fake file system. + patch_default_args: If True, default arguments are checked for + file system functions, which are patched. This check is + expansive, so it is off by default. + use_cache: If True (default), patched and non-patched modules are + cached between tests for performance reasons. As this is a new + feature, this argument allows to turn it off in case it + causes any problems. + """ if not allow_root_user: # set non-root IDs even if the real user is root set_uid(1) set_gid(1) - self._skipNames = self.SKIPNAMES.copy() + self._skip_names = self.SKIPNAMES.copy() # save the original open function for use in pytest plugin self.original_open = open - self.fake_open = None + self.patch_open_code = patch_open_code if additional_skip_names is not None: - skip_names = [m.__name__ if inspect.ismodule(m) else m - for m in additional_skip_names] - self._skipNames.update(skip_names) - - self._fake_module_classes = {} - self._class_modules = {} + skip_names = [ + cast(ModuleType, m).__name__ if inspect.ismodule(m) + else cast(str, m) for m in additional_skip_names + ] + self._skip_names.update(skip_names) + + self._fake_module_classes: Dict[str, Any] = {} + self._unfaked_module_classes: Dict[str, Any] = {} + self._class_modules: Dict[str, List[str]] = {} self._init_fake_module_classes() - self.modules_to_reload = modules_to_reload or [] + # reload tempfile under posix to patch default argument + self.modules_to_reload: List[ModuleType] = ( + [] if sys.platform == 'win32' else [tempfile] + ) + if modules_to_reload is not None: + self.modules_to_reload.extend(modules_to_reload) + self.patch_default_args = patch_default_args + self.use_cache = use_cache + + if use_known_patches: + from pyfakefs.patched_packages import ( + get_modules_to_patch, get_classes_to_patch, + get_fake_module_classes + ) + + modules_to_patch = modules_to_patch or {} + modules_to_patch.update(get_modules_to_patch()) + self._class_modules.update(get_classes_to_patch()) + self._fake_module_classes.update(get_fake_module_classes()) if modules_to_patch is not None: for name, fake_module in modules_to_patch.items(): self._fake_module_classes[name] = fake_module - - self._fake_module_functions = {} + patched_module_names = set(modules_to_patch) + else: + patched_module_names = set() + clear_cache = not use_cache + if use_cache: + if patched_module_names != self.PATCHED_MODULE_NAMES: + self.__class__.PATCHED_MODULE_NAMES = patched_module_names + clear_cache = True + if self._skip_names != self.ADDITIONAL_SKIP_NAMES: + self.__class__.ADDITIONAL_SKIP_NAMES = self._skip_names + clear_cache = True + if patch_default_args != self.PATCH_DEFAULT_ARGS: + self.__class__.PATCH_DEFAULT_ARGS = patch_default_args + clear_cache = True + + if clear_cache: + self.clear_cache() + self._fake_module_functions: Dict[str, Dict] = {} self._init_fake_module_functions() # Attributes set by _refresh() - self._modules = {} - self._fct_modules = {} - self._def_functions = [] - self._open_functions = {} - self._stubs = None - self.fs = None - self.fake_modules = {} - self._dyn_patcher = None + self._stubs: Optional[StubOutForTesting] = None + self.fs: Optional[FakeFilesystem] = None + self.fake_modules: Dict[str, Any] = {} + self.unfaked_modules: Dict[str, Any] = {} # _isStale is set by tearDown(), reset by _refresh() self._isStale = True + self._dyn_patcher: Optional[DynamicPatcher] = None self._patching = False - def _init_fake_module_classes(self): + def clear_cache(self) -> None: + """Clear the module cache.""" + self.__class__.CACHED_MODULES = set() + self.__class__.FS_MODULES = {} + self.__class__.FS_FUNCTIONS = {} + self.__class__.FS_DEFARGS = [] + self.__class__.SKIPPED_FS_MODULES = {} + + def _init_fake_module_classes(self) -> None: # IMPORTANT TESTING NOTE: Whenever you add a new module below, test # it by adding an attribute in fixtures/module_with_attributes.py # and a test in fake_filesystem_unittest_test.py, class @@ -407,31 +533,36 @@ class Patcher: 'os': fake_filesystem.FakeOsModule, 'shutil': fake_filesystem_shutil.FakeShutilModule, 'io': fake_filesystem.FakeIoModule, + 'pathlib': fake_pathlib.FakePathlibModule } if IS_PYPY: # in PyPy io.open, the module is referenced as _io self._fake_module_classes['_io'] = fake_filesystem.FakeIoModule + if sys.platform != 'win32': + self._fake_module_classes[ + 'fcntl'] = fake_filesystem.FakeFcntlModule # class modules maps class names against a list of modules they can # be contained in - this allows for alternative modules like # `pathlib` and `pathlib2` - if pathlib: - self._class_modules['Path'] = [] - if pathlib: - self._fake_module_classes[ - 'pathlib'] = fake_pathlib.FakePathlibModule - self._class_modules['Path'].append('pathlib') - if pathlib2: - self._fake_module_classes[ - 'pathlib2'] = fake_pathlib.FakePathlibModule - self._class_modules['Path'].append('pathlib2') + self._class_modules['Path'] = ['pathlib'] + self._unfaked_module_classes[ + 'pathlib'] = fake_pathlib.RealPathlibModule + if pathlib2: self._fake_module_classes[ - 'Path'] = fake_pathlib.FakePathlibPathModule + 'pathlib2'] = fake_pathlib.FakePathlibModule + self._class_modules['Path'].append('pathlib2') + self._unfaked_module_classes[ + 'pathlib2'] = fake_pathlib.RealPathlibModule + self._fake_module_classes[ + 'Path'] = fake_pathlib.FakePathlibPathModule + self._unfaked_module_classes[ + 'Path'] = fake_pathlib.RealPathlibPathModule if use_scandir: self._fake_module_classes[ 'scandir'] = fake_scandir.FakeScanDirModule - def _init_fake_module_functions(self): + def _init_fake_module_functions(self) -> None: # handle patching function imported separately like # `from os import stat` # each patched function name has to be looked up separately @@ -455,7 +586,7 @@ class Patcher: self._fake_module_functions.setdefault( fct_name, {})[PATH_MODULE] = module_attr - def __enter__(self): + def __enter__(self) -> 'Patcher': """Context manager for usage outside of fake_filesystem_unittest.TestCase. Ensure that all patched modules are removed in case of an @@ -464,120 +595,153 @@ class Patcher: self.setUp() return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType]) -> None: self.tearDown() - def _is_fs_module(self, mod, name, module_names): + def _is_fs_module(self, mod: ModuleType, + name: str, + module_names: List[str]) -> bool: try: - return (inspect.ismodule(mod) and - mod.__name__ in module_names - or inspect.isclass(mod) and - mod.__module__ in self._class_modules.get(name, [])) - except AttributeError: - # handle cases where the module has no __name__ or __module__ - # attribute - see #460 - return False - - def _is_fs_function(self, fct): + # check for __name__ first and ignore the AttributeException + # if it does not exist - avoids calling expansive ismodule + if mod.__name__ in module_names and inspect.ismodule(mod): + return True + except Exception: + pass try: - return ((inspect.isfunction(fct) or - inspect.isbuiltin(fct)) and - fct.__name__ in self._fake_module_functions and + if (name in self._class_modules and + mod.__module__ in self._class_modules[name]): + return inspect.isclass(mod) + except Exception: + # handle AttributeError and any other exception possibly triggered + # by side effects of inspect methods + pass + return False + + def _is_fs_function(self, fct: FunctionType) -> bool: + try: + # check for __name__ first and ignore the AttributeException + # if it does not exist - avoids calling expansive inspect + # methods in most cases + return (fct.__name__ in self._fake_module_functions and fct.__module__ in self._fake_module_functions[ - fct.__name__]) - except AttributeError: - # handle cases where the function has no __name__ or __module__ - # attribute + fct.__name__] and + (inspect.isfunction(fct) or inspect.isbuiltin(fct))) + except Exception: + # handle AttributeError and any other exception possibly triggered + # by side effects of inspect methods return False - def _def_values(self, item): + def _def_values( + self, + item: FunctionType) -> Iterator[Tuple[FunctionType, int, Any]]: """Find default arguments that are file-system functions to be patched in top-level functions and members of top-level classes.""" # check for module-level functions - if inspect.isfunction(item): - if item.__defaults__: + try: + if item.__defaults__ and inspect.isfunction(item): for i, d in enumerate(item.__defaults__): if self._is_fs_function(d): yield item, i, d - elif inspect.isclass(item): - # check for methods in class (nested classes are ignored for now) - try: + except Exception: + pass + try: + if inspect.isclass(item): + # check for methods in class + # (nested classes are ignored for now) + # inspect.getmembers is very expansive! for m in inspect.getmembers(item, predicate=inspect.isfunction): - m = m[1] - if m.__defaults__: - for i, d in enumerate(m.__defaults__): + f = cast(FunctionType, m[1]) + if f.__defaults__: + for i, d in enumerate(f.__defaults__): if self._is_fs_function(d): - yield m, i, d - except Exception: - # Ignore any exception, examples: - # ImportError: No module named '_gdbm' - # _DontDoThat() (see #523) - pass - - def _find_modules(self): + yield f, i, d + except Exception: + # Ignore any exception, examples: + # ImportError: No module named '_gdbm' + # _DontDoThat() (see #523) + pass + + def _find_def_values( + self, module_items: ItemsView[str, FunctionType]) -> None: + for _, fct in module_items: + for f, i, d in self._def_values(fct): + self.__class__.FS_DEFARGS.append((f, i, d)) + + def _find_modules(self) -> None: """Find and cache all modules that import file system modules. Later, `setUp()` will stub these with the fake file system modules. """ - module_names = list(self._fake_module_classes.keys()) + [PATH_MODULE] for name, module in list(sys.modules.items()): try: - if (module in self.SKIPMODULES or - not inspect.ismodule(module) or - module.__name__.split('.')[0] in self._skipNames): + if (self.use_cache and module in self.CACHED_MODULES or + not inspect.ismodule(module)): continue - except AttributeError: + except Exception: # workaround for some py (part of pytest) versions # where py.error has no __name__ attribute # see https://github.com/pytest-dev/py/issues/73 + # and any other exception triggered by inspect.ismodule + if self.use_cache: + self.__class__.CACHED_MODULES.add(module) continue - + skipped = (module in self.SKIPMODULES or + any([sn.startswith(module.__name__) + for sn in self._skip_names])) module_items = module.__dict__.copy().items() - # suppress specific pytest warning - see #466 - with warnings.catch_warnings(): - warnings.filterwarnings( - 'ignore', - message='The compiler package is deprecated', - category=DeprecationWarning, - module='py' - ) - modules = {name: mod for name, mod in module_items - if self._is_fs_module(mod, name, module_names)} - - for name, mod in modules.items(): - self._modules.setdefault(name, set()).add((module, - mod.__name__)) - functions = {name: fct for name, fct in - module_items - if self._is_fs_function(fct)} - - # find default arguments that are file system functions - for _, fct in module_items: - for f, i, d in self._def_values(fct): - self._def_functions.append((f, i, d)) - - for name, fct in functions.items(): - self._fct_modules.setdefault( - (name, fct.__name__, fct.__module__), set()).add(module) - - def _refresh(self): + modules = {name: mod for name, mod in module_items + if self._is_fs_module(mod, name, module_names)} + + if skipped: + for name, mod in modules.items(): + self.__class__.SKIPPED_FS_MODULES.setdefault( + name, set()).add((module, mod.__name__)) + else: + for name, mod in modules.items(): + self.__class__.FS_MODULES.setdefault(name, set()).add( + (module, mod.__name__)) + functions = {name: fct for name, fct in + module_items + if self._is_fs_function(fct)} + + for name, fct in functions.items(): + self.__class__.FS_FUNCTIONS.setdefault( + (name, fct.__name__, fct.__module__), + set()).add(module) + + # find default arguments that are file system functions + if self.patch_default_args: + self._find_def_values(module_items) + + if self.use_cache: + self.__class__.CACHED_MODULES.add(module) + + def _refresh(self) -> None: """Renew the fake file system and set the _isStale flag to `False`.""" if self._stubs is not None: self._stubs.smart_unset_all() self._stubs = mox3_stubout.StubOutForTesting() self.fs = fake_filesystem.FakeFilesystem(patcher=self) + self.fs.patch_open_code = self.patch_open_code for name in self._fake_module_classes: self.fake_modules[name] = self._fake_module_classes[name](self.fs) + if hasattr(self.fake_modules[name], 'skip_names'): + self.fake_modules[name].skip_names = self._skip_names self.fake_modules[PATH_MODULE] = self.fake_modules['os'].path - self.fake_open = fake_filesystem.FakeFileOpen(self.fs) + for name in self._unfaked_module_classes: + self.unfaked_modules[name] = self._unfaked_module_classes[name]() self._isStale = False - def setUp(self, doctester=None): + def setUp(self, doctester: Any = None) -> None: """Bind the file-related modules to the :py:mod:`pyfakefs` fake modules real ones. Also bind the fake `file()` and `open()` functions. """ @@ -585,56 +749,81 @@ class Patcher: hasattr(shutil, '_HAS_FCOPYFILE') and shutil._HAS_FCOPYFILE) if self.has_fcopy_file: - shutil._HAS_FCOPYFILE = False + shutil._HAS_FCOPYFILE = False # type: ignore[attr-defined] temp_dir = tempfile.gettempdir() - self._find_modules() + with warnings.catch_warnings(): + # ignore warnings, see #542 and #614 + warnings.filterwarnings( + 'ignore' + ) + self._find_modules() + self._refresh() if doctester is not None: doctester.globs = self.replace_globs(doctester.globs) self.start_patching() + linecache.open = self.original_open # type: ignore[attr-defined] + tokenize._builtin_open = self.original_open # type: ignore # the temp directory is assumed to exist at least in `tempfile1`, # so we create it here for convenience + assert self.fs is not None self.fs.create_dir(temp_dir) - def start_patching(self): + def start_patching(self) -> None: if not self._patching: self._patching = True - for name, modules in self._modules.items(): - for module, attr in modules: - self._stubs.smart_set( - module, name, self.fake_modules[attr]) - for (name, ft_name, ft_mod), modules in self._fct_modules.items(): - method, mod_name = self._fake_module_functions[ft_name][ft_mod] - fake_module = self.fake_modules[mod_name] - attr = method.__get__(fake_module, fake_module.__class__) - for module in modules: - self._stubs.smart_set(module, name, attr) - - for (fct, idx, ft) in self._def_functions: - method, mod_name = self._fake_module_functions[ - ft.__name__][ft.__module__] - fake_module = self.fake_modules[mod_name] - attr = method.__get__(fake_module, fake_module.__class__) - new_defaults = [] - for i, d in enumerate(fct.__defaults__): - if i == idx: - new_defaults.append(attr) - else: - new_defaults.append(d) - fct.__defaults__ = tuple(new_defaults) + self.patch_modules() + self.patch_functions() + self.patch_defaults() self._dyn_patcher = DynamicPatcher(self) sys.meta_path.insert(0, self._dyn_patcher) for module in self.modules_to_reload: - if module.__name__ in sys.modules: + if sys.modules.get(module.__name__) is module: reload(module) - def replace_globs(self, globs_): + def patch_functions(self) -> None: + assert self._stubs is not None + for (name, ft_name, ft_mod), modules in self.FS_FUNCTIONS.items(): + method, mod_name = self._fake_module_functions[ft_name][ft_mod] + fake_module = self.fake_modules[mod_name] + attr = method.__get__(fake_module, fake_module.__class__) + for module in modules: + self._stubs.smart_set(module, name, attr) + + def patch_modules(self) -> None: + assert self._stubs is not None + for name, modules in self.FS_MODULES.items(): + for module, attr in modules: + self._stubs.smart_set( + module, name, self.fake_modules[attr]) + for name, modules in self.SKIPPED_FS_MODULES.items(): + for module, attr in modules: + if attr in self.unfaked_modules: + self._stubs.smart_set( + module, name, self.unfaked_modules[attr]) + + def patch_defaults(self) -> None: + for (fct, idx, ft) in self.FS_DEFARGS: + method, mod_name = self._fake_module_functions[ + ft.__name__][ft.__module__] + fake_module = self.fake_modules[mod_name] + attr = method.__get__(fake_module, fake_module.__class__) + new_defaults = [] + assert fct.__defaults__ is not None + for i, d in enumerate(fct.__defaults__): + if i == idx: + new_defaults.append(attr) + else: + new_defaults.append(d) + fct.__defaults__ = tuple(new_defaults) + + def replace_globs(self, globs_: Dict[str, Any]) -> Dict[str, Any]: globs = globs_.copy() if self._isStale: self._refresh() @@ -643,35 +832,36 @@ class Patcher: globs[name] = self._fake_module_classes[name](self.fs) return globs - def tearDown(self, doctester=None): + def tearDown(self, doctester: Any = None): """Clear the fake filesystem bindings created by `setUp()`.""" self.stop_patching() if self.has_fcopy_file: - shutil._HAS_FCOPYFILE = True + shutil._HAS_FCOPYFILE = True # type: ignore[attr-defined] reset_ids() - def stop_patching(self): + def stop_patching(self) -> None: if self._patching: self._isStale = True self._patching = False - self._stubs.smart_unset_all() + if self._stubs: + self._stubs.smart_unset_all() self.unset_defaults() - self._dyn_patcher.cleanup() - sys.meta_path.pop(0) + if self._dyn_patcher: + self._dyn_patcher.cleanup() + sys.meta_path.pop(0) - def unset_defaults(self): - for (fct, idx, ft) in self._def_functions: + def unset_defaults(self) -> None: + for (fct, idx, ft) in self.FS_DEFARGS: new_defaults = [] - for i, d in enumerate(fct.__defaults__): + for i, d in enumerate(cast(Tuple, fct.__defaults__)): if i == idx: new_defaults.append(ft) else: new_defaults.append(d) fct.__defaults__ = tuple(new_defaults) - self._def_functions = [] - def pause(self): + def pause(self) -> None: """Pause the patching of the file system modules until `resume` is called. After that call, all file system calls are executed in the real file system. @@ -680,7 +870,7 @@ class Patcher: """ self.stop_patching() - def resume(self): + def resume(self) -> None: """Resume the patching of the file system modules if `pause` has been called before. After that call, all file system calls are executed in the fake file system. @@ -695,7 +885,7 @@ class Pause: going out of it's scope. """ - def __init__(self, caller): + def __init__(self, caller: Union[Patcher, TestCaseMixin, FakeFilesystem]): """Initializes the context manager with the fake filesystem. Args: @@ -703,8 +893,9 @@ class Pause: or the pyfakefs test case. """ if isinstance(caller, (Patcher, TestCaseMixin)): - self._fs = caller.fs - elif isinstance(caller, fake_filesystem.FakeFilesystem): + assert caller.fs is not None + self._fs: FakeFilesystem = caller.fs + elif isinstance(caller, FakeFilesystem): self._fs = caller else: raise ValueError('Invalid argument - should be of type ' @@ -712,25 +903,25 @@ class Pause: '"fake_filesystem_unittest.TestCase" ' 'or "fake_filesystem.FakeFilesystem"') - def __enter__(self): + def __enter__(self) -> FakeFilesystem: self._fs.pause() return self._fs - def __exit__(self, *args): - return self._fs.resume() + def __exit__(self, *args: Any) -> None: + self._fs.resume() -class DynamicPatcher: +class DynamicPatcher(MetaPathFinder, Loader): """A file loader that replaces file system related modules by their fake implementation if they are loaded after calling `setUpPyfakefs()`. Implements the protocol needed for import hooks. """ - def __init__(self, patcher): + def __init__(self, patcher: Patcher) -> None: self._patcher = patcher self.sysmodules = {} self.modules = self._patcher.fake_modules - self._loaded_module_names = set() + self._loaded_module_names: Set[str] = set() # remove all modules that have to be patched from `sys.modules`, # otherwise the find_... methods will not be called @@ -742,9 +933,9 @@ class DynamicPatcher: for name, module in self.modules.items(): sys.modules[name] = module - def cleanup(self): - for module in self.sysmodules: - sys.modules[module] = self.sysmodules[module] + def cleanup(self) -> None: + for module_name in self.sysmodules: + sys.modules[module_name] = self.sysmodules[module_name] for module in self._patcher.modules_to_reload: if module.__name__ in sys.modules: reload(module) @@ -757,7 +948,7 @@ class DynamicPatcher: if name in sys.modules and name not in reloaded_module_names: del sys.modules[name] - def needs_patch(self, name): + def needs_patch(self, name: str) -> bool: """Check if the module with the given name shall be replaced.""" if name not in self.modules: self._loaded_module_names.add(name) @@ -767,12 +958,15 @@ class DynamicPatcher: return False return True - def find_spec(self, fullname, path, target=None): - """Module finder for Python 3.""" + def find_spec(self, fullname: str, + path: Optional[Sequence[Union[bytes, str]]], + target: Optional[ModuleType] = None) -> Optional[ModuleSpec]: + """Module finder.""" if self.needs_patch(fullname): return ModuleSpec(fullname, self) + return None - def load_module(self, fullname): + def load_module(self, fullname: str) -> ModuleType: """Replaces the module by its fake implementation.""" sys.modules[fullname] = self.modules[fullname] return self.modules[fullname] |