aboutsummaryrefslogtreecommitdiff
path: root/catapult/common/py_vulcanize/py_vulcanize/fake_fs.py
blob: dfcb5e60bf4e4df60b0e911c0f9a46fcfef9d1ff (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import codecs
import os
import sys
import collections
import StringIO


class WithableStringIO(StringIO.StringIO):

  def __enter__(self, *args):
    return self

  def __exit__(self, *args):
    pass


class FakeFS(object):

  def __init__(self, initial_filenames_and_contents=None):
    self._file_contents = {}
    if initial_filenames_and_contents:
      for k, v in initial_filenames_and_contents.iteritems():
        self._file_contents[k] = v

    self._bound = False
    self._real_codecs_open = codecs.open
    self._real_open = sys.modules['__builtin__'].open
    self._real_abspath = os.path.abspath
    self._real_exists = os.path.exists
    self._real_walk = os.walk
    self._real_listdir = os.listdir

  def __enter__(self):
    self.Bind()
    return self

  def __exit__(self, *args):
    self.Unbind()

  def Bind(self):
    assert not self._bound
    codecs.open = self._FakeCodecsOpen
    sys.modules['__builtin__'].open = self._FakeOpen
    os.path.abspath = self._FakeAbspath
    os.path.exists = self._FakeExists
    os.walk = self._FakeWalk
    os.listdir = self._FakeListDir
    self._bound = True

  def Unbind(self):
    assert self._bound
    codecs.open = self._real_codecs_open
    sys.modules['__builtin__'].open = self._real_open
    os.path.abspath = self._real_abspath
    os.path.exists = self._real_exists
    os.walk = self._real_walk
    os.listdir = self._real_listdir
    self._bound = False

  def AddFile(self, path, contents):
    assert path not in self._file_contents
    path = os.path.normpath(path)
    self._file_contents[path] = contents

  def _FakeOpen(self, path, mode=None):
    if mode is None:
      mode = 'r'
    if mode == 'r' or mode == 'rU' or mode == 'rb':
      if path not in self._file_contents:
        return self._real_open(path, mode)
      return WithableStringIO(self._file_contents[path])

    raise NotImplementedError()

  def _FakeCodecsOpen(self, path, mode=None,
                      encoding=None):  # pylint: disable=unused-argument
    if mode is None:
      mode = 'r'
    if mode == 'r' or mode == 'rU' or mode == 'rb':
      if path not in self._file_contents:
        return self._real_open(path, mode)
      return WithableStringIO(self._file_contents[path])

    raise NotImplementedError()

  def _FakeAbspath(self, path):
    """Normalize the path and ensure it starts with os.path.sep.

    The tests all assume paths start with things like '/my/project',
    and this abspath implementaion makes that assumption work correctly
    on Windows.
    """
    normpath = os.path.normpath(path)
    if not normpath.startswith(os.path.sep):
      normpath = os.path.sep + normpath
    return normpath

  def _FakeExists(self, path):
    if path in self._file_contents:
      return True
    return self._real_exists(path)

  def _FakeWalk(self, top):
    assert os.path.isabs(top)
    all_filenames = self._file_contents.keys()
    pending_prefixes = collections.deque()
    pending_prefixes.append(top)
    visited_prefixes = set()
    while len(pending_prefixes):
      prefix = pending_prefixes.popleft()
      if prefix in visited_prefixes:
        continue
      visited_prefixes.add(prefix)
      if prefix.endswith(os.path.sep):
        prefix_with_trailing_sep = prefix
      else:
        prefix_with_trailing_sep = prefix + os.path.sep

      dirs = set()
      files = []
      for filename in all_filenames:
        if not filename.startswith(prefix_with_trailing_sep):
          continue
        relative_to_prefix = os.path.relpath(filename, prefix)

        dirpart = os.path.dirname(relative_to_prefix)
        if len(dirpart) == 0:
          files.append(relative_to_prefix)
          continue
        parts = dirpart.split(os.sep)
        if len(parts) == 0:
          dirs.add(dirpart)
        else:
          pending = os.path.join(prefix, parts[0])
          dirs.add(parts[0])
          pending_prefixes.appendleft(pending)

      dirs = sorted(dirs)
      yield prefix, dirs, files

  def _FakeListDir(self, dirname):
    raise NotImplementedError()