aboutsummaryrefslogtreecommitdiff
path: root/catapult/telemetry/telemetry/testing/system_stub.py
diff options
context:
space:
mode:
Diffstat (limited to 'catapult/telemetry/telemetry/testing/system_stub.py')
-rw-r--r--catapult/telemetry/telemetry/testing/system_stub.py491
1 files changed, 491 insertions, 0 deletions
diff --git a/catapult/telemetry/telemetry/testing/system_stub.py b/catapult/telemetry/telemetry/testing/system_stub.py
new file mode 100644
index 00000000..78e152f9
--- /dev/null
+++ b/catapult/telemetry/telemetry/testing/system_stub.py
@@ -0,0 +1,491 @@
+# Copyright 2012 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Provides stubs for os, sys and subprocess for testing
+
+This test allows one to test code that itself uses os, sys, and subprocess.
+"""
+
+import ntpath
+import os
+import posixpath
+import re
+import shlex
+import sys
+
+
+class Override(object):
+
+ _overidden_modules = set()
+
+ def __init__(self, base_module, module_list):
+ self.cloud_storage = None
+ self.open = None
+ self.os = None
+ self.perf_control = None
+ self.raw_input = None
+ self.subprocess = None
+ self.sys = None
+ self.thermal_throttle = None
+ self.logging = None
+ stubs = {'cloud_storage': CloudStorageModuleStub,
+ 'open': OpenFunctionStub,
+ 'os': OsModuleStub,
+ 'perf_control': PerfControlModuleStub,
+ 'raw_input': RawInputFunctionStub,
+ 'subprocess': SubprocessModuleStub,
+ 'sys': SysModuleStub,
+ 'thermal_throttle': ThermalThrottleModuleStub,
+ 'logging': LoggingStub,
+ }
+ self.adb_commands = None
+ self.os = None
+ self.subprocess = None
+ self.sys = None
+
+ assert base_module not in self._overidden_modules, (
+ '%s is already overridden' % base_module.__name__)
+ self._overidden_modules.add(base_module)
+ self._base_module = base_module
+ self._overrides = {}
+
+ for module_name in module_list:
+ self._overrides[module_name] = getattr(base_module, module_name, None)
+ setattr(self, module_name, stubs[module_name]())
+ setattr(base_module, module_name, getattr(self, module_name))
+
+ if self.os and self.sys:
+ self.os.path.sys = self.sys
+
+ def __del__(self):
+ assert not len(self._overrides)
+
+ def Restore(self):
+ for module_name, original_module in self._overrides.iteritems():
+ if original_module is None:
+ # This will happen when we override built-in functions, like open.
+ # If we don't delete the attribute, we will shadow the built-in
+ # function with an attribute set to None.
+ delattr(self._base_module, module_name)
+ else:
+ setattr(self._base_module, module_name, original_module)
+ self._overrides = {}
+ self._overidden_modules.remove(self._base_module)
+ self._base_module = None
+
+
+class AdbDevice(object):
+
+ def __init__(self):
+ self.has_root = False
+ self.needs_su = False
+ self.shell_command_handlers = {}
+ self.mock_content = []
+ self.system_properties = {}
+ if self.system_properties.get('ro.product.cpu.abi') == None:
+ self.system_properties['ro.product.cpu.abi'] = 'armeabi-v7a'
+
+ def HasRoot(self):
+ return self.has_root
+
+ def NeedsSU(self):
+ return self.needs_su
+
+ def RunShellCommand(self, args, **kwargs):
+ del kwargs # unused
+ if isinstance(args, basestring):
+ args = shlex.split(args)
+ handler = self.shell_command_handlers[args[0]]
+ return handler(args)
+
+ def FileExists(self, _):
+ return False
+
+ def ReadFile(self, device_path, as_root=False):
+ del device_path, as_root # unused
+ return self.mock_content
+
+ def GetProp(self, property_name):
+ return self.system_properties[property_name]
+
+ def SetProp(self, property_name, property_value):
+ self.system_properties[property_name] = property_value
+
+
+class CloudStorageModuleStub(object):
+ PUBLIC_BUCKET = 'chromium-telemetry'
+ PARTNER_BUCKET = 'chrome-partner-telemetry'
+ INTERNAL_BUCKET = 'chrome-telemetry'
+ BUCKET_ALIASES = {
+ 'public': PUBLIC_BUCKET,
+ 'partner': PARTNER_BUCKET,
+ 'internal': INTERNAL_BUCKET,
+ }
+
+ # These are used to test for CloudStorage errors.
+ INTERNAL_PERMISSION = 2
+ PARTNER_PERMISSION = 1
+ PUBLIC_PERMISSION = 0
+ # Not logged in.
+ CREDENTIALS_ERROR_PERMISSION = -1
+
+ class NotFoundError(Exception):
+ pass
+
+ class CloudStorageError(Exception):
+ pass
+
+ class PermissionError(CloudStorageError):
+ pass
+
+ class CredentialsError(CloudStorageError):
+ pass
+
+ def __init__(self):
+ self.default_remote_paths = {CloudStorageModuleStub.INTERNAL_BUCKET:{},
+ CloudStorageModuleStub.PARTNER_BUCKET:{},
+ CloudStorageModuleStub.PUBLIC_BUCKET:{}}
+ self.remote_paths = self.default_remote_paths
+ self.local_file_hashes = {}
+ self.local_hash_files = {}
+ self.permission_level = CloudStorageModuleStub.INTERNAL_PERMISSION
+ self.downloaded_files = []
+
+ def SetPermissionLevelForTesting(self, permission_level):
+ self.permission_level = permission_level
+
+ def CheckPermissionLevelForBucket(self, bucket):
+ if bucket == CloudStorageModuleStub.PUBLIC_BUCKET:
+ return
+ elif (self.permission_level ==
+ CloudStorageModuleStub.CREDENTIALS_ERROR_PERMISSION):
+ raise CloudStorageModuleStub.CredentialsError()
+ elif bucket == CloudStorageModuleStub.PARTNER_BUCKET:
+ if self.permission_level < CloudStorageModuleStub.PARTNER_PERMISSION:
+ raise CloudStorageModuleStub.PermissionError()
+ elif bucket == CloudStorageModuleStub.INTERNAL_BUCKET:
+ if self.permission_level < CloudStorageModuleStub.INTERNAL_PERMISSION:
+ raise CloudStorageModuleStub.PermissionError()
+ elif bucket not in self.remote_paths:
+ raise CloudStorageModuleStub.NotFoundError()
+
+ def SetRemotePathsForTesting(self, remote_path_dict=None):
+ if not remote_path_dict:
+ self.remote_paths = self.default_remote_paths
+ return
+ self.remote_paths = remote_path_dict
+
+ def GetRemotePathsForTesting(self):
+ if not self.remote_paths:
+ self.remote_paths = self.default_remote_paths
+ return self.remote_paths
+
+ # Set a dictionary of data files and their "calculated" hashes.
+ def SetCalculatedHashesForTesting(self, calculated_hash_dictionary):
+ self.local_file_hashes = calculated_hash_dictionary
+
+ def GetLocalDataFiles(self):
+ return self.local_file_hashes.keys()
+
+ # Set a dictionary of hash files and the hashes they should contain.
+ def SetHashFileContentsForTesting(self, hash_file_dictionary):
+ self.local_hash_files = hash_file_dictionary
+
+ def GetLocalHashFiles(self):
+ return self.local_hash_files.keys()
+
+ def ChangeRemoteHashForTesting(self, bucket, remote_path, new_hash):
+ self.remote_paths[bucket][remote_path] = new_hash
+
+ def List(self, bucket):
+ if not bucket or not bucket in self.remote_paths:
+ bucket_error = ('Incorrect bucket specified, correct buckets:' +
+ str(self.remote_paths))
+ raise CloudStorageModuleStub.CloudStorageError(bucket_error)
+ CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
+ return list(self.remote_paths[bucket].keys())
+
+ def Exists(self, bucket, remote_path):
+ CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
+ return remote_path in self.remote_paths[bucket]
+
+ def Insert(self, bucket, remote_path, local_path):
+ CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
+ if not local_path in self.GetLocalDataFiles():
+ file_path_error = 'Local file path does not exist'
+ raise CloudStorageModuleStub.CloudStorageError(file_path_error)
+ self.remote_paths[bucket][remote_path] = (
+ CloudStorageModuleStub.CalculateHash(self, local_path))
+ return remote_path
+
+ def GetHelper(self, bucket, remote_path, local_path, only_if_changed):
+ CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
+ if not remote_path in self.remote_paths[bucket]:
+ if only_if_changed:
+ return False
+ raise CloudStorageModuleStub.NotFoundError('Remote file does not exist.')
+ remote_hash = self.remote_paths[bucket][remote_path]
+ local_hash = self.local_file_hashes[local_path]
+ if only_if_changed and remote_hash == local_hash:
+ return False
+ self.downloaded_files.append(remote_path)
+ self.local_file_hashes[local_path] = remote_hash
+ self.local_hash_files[local_path + '.sha1'] = remote_hash
+ return remote_hash
+
+ def Get(self, bucket, remote_path, local_path):
+ return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
+ local_path, False)
+
+ def GetIfChanged(self, local_path, bucket=None):
+ remote_path = os.path.basename(local_path)
+ if bucket:
+ return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
+ local_path, True)
+ result = CloudStorageModuleStub.GetHelper(
+ self, self.PUBLIC_BUCKET, remote_path, local_path, True)
+ if not result:
+ result = CloudStorageModuleStub.GetHelper(
+ self, self.PARTNER_BUCKET, remote_path, local_path, True)
+ if not result:
+ result = CloudStorageModuleStub.GetHelper(
+ self, self.INTERNAL_BUCKET, remote_path, local_path, True)
+ return result
+
+ def GetFilesInDirectoryIfChanged(self, directory, bucket):
+ if os.path.dirname(directory) == directory: # If in the root dir.
+ raise ValueError('Trying to serve root directory from HTTP server.')
+ for dirpath, _, filenames in os.walk(directory):
+ for filename in filenames:
+ path, extension = os.path.splitext(
+ os.path.join(dirpath, filename))
+ if extension != '.sha1':
+ continue
+ self.GetIfChanged(path, bucket)
+
+ def CalculateHash(self, file_path):
+ return self.local_file_hashes[file_path]
+
+ def ReadHash(self, hash_path):
+ return self.local_hash_files[hash_path]
+
+
+class LoggingStub(object):
+ def __init__(self):
+ self.warnings = []
+ self.errors = []
+
+ def info(self, msg, *args):
+ pass
+
+ def error(self, msg, *args):
+ self.errors.append(msg % args)
+
+ def warning(self, msg, *args):
+ self.warnings.append(msg % args)
+
+ def warn(self, msg, *args):
+ self.warning(msg, *args)
+
+
+class OpenFunctionStub(object):
+ class FileStub(object):
+ def __init__(self, data):
+ self._data = data
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ pass
+
+ def read(self, size=None):
+ if size:
+ return self._data[:size]
+ else:
+ return self._data
+
+ def write(self, data):
+ self._data.write(data)
+
+ def close(self):
+ pass
+
+ def __init__(self):
+ self.files = {}
+
+ def __call__(self, name, *args, **kwargs):
+ return OpenFunctionStub.FileStub(self.files[name])
+
+
+class OsModuleStub(object):
+ class OsEnvironModuleStub(object):
+ def get(self, _):
+ return None
+
+ class OsPathModuleStub(object):
+ def __init__(self, sys_module):
+ self.sys = sys_module
+ self.files = []
+ self.dirs = []
+
+ def exists(self, path):
+ return path in self.files
+
+ def isfile(self, path):
+ return path in self.files
+
+ def isdir(self, path):
+ return path in self.dirs
+
+ def join(self, *paths):
+ def IsAbsolutePath(path):
+ if self.sys.platform.startswith('win'):
+ return re.match('[a-zA-Z]:\\\\', path)
+ else:
+ return path.startswith('/')
+
+ # Per Python specification, if any component is an absolute path,
+ # discard previous components.
+ for index, path in reversed(list(enumerate(paths))):
+ if IsAbsolutePath(path):
+ paths = paths[index:]
+ break
+
+ if self.sys.platform.startswith('win'):
+ tmp = os.path.join(*paths)
+ return tmp.replace('/', '\\')
+ else:
+ tmp = os.path.join(*paths)
+ return tmp.replace('\\', '/')
+
+ def basename(self, path):
+ if self.sys.platform.startswith('win'):
+ return ntpath.basename(path)
+ else:
+ return posixpath.basename(path)
+
+ @staticmethod
+ def abspath(path):
+ return os.path.abspath(path)
+
+ @staticmethod
+ def expanduser(path):
+ return os.path.expanduser(path)
+
+ @staticmethod
+ def dirname(path):
+ return os.path.dirname(path)
+
+ @staticmethod
+ def realpath(path):
+ return os.path.realpath(path)
+
+ @staticmethod
+ def split(path):
+ return os.path.split(path)
+
+ @staticmethod
+ def splitext(path):
+ return os.path.splitext(path)
+
+ @staticmethod
+ def splitdrive(path):
+ return os.path.splitdrive(path)
+
+ X_OK = os.X_OK
+
+ sep = os.sep
+ pathsep = os.pathsep
+
+ def __init__(self, sys_module=sys):
+ self.path = OsModuleStub.OsPathModuleStub(sys_module)
+ self.environ = OsModuleStub.OsEnvironModuleStub()
+ self.display = ':0'
+ self.local_app_data = None
+ self.sys_path = None
+ self.program_files = None
+ self.program_files_x86 = None
+ self.devnull = os.devnull
+ self._directory = {}
+
+ def access(self, path, _):
+ return path in self.path.files
+
+ def getenv(self, name, value=None):
+ if name == 'DISPLAY':
+ env = self.display
+ elif name == 'LOCALAPPDATA':
+ env = self.local_app_data
+ elif name == 'PATH':
+ env = self.sys_path
+ elif name == 'PROGRAMFILES':
+ env = self.program_files
+ elif name == 'PROGRAMFILES(X86)':
+ env = self.program_files_x86
+ else:
+ raise NotImplementedError('Unsupported getenv')
+ return env if env else value
+
+ def chdir(self, path):
+ pass
+
+ def walk(self, top):
+ for dir_name in self._directory:
+ yield top, dir_name, self._directory[dir_name]
+
+
+class PerfControlModuleStub(object):
+ class PerfControlStub(object):
+ def __init__(self, adb):
+ pass
+
+ def __init__(self):
+ self.PerfControl = PerfControlModuleStub.PerfControlStub
+
+
+class RawInputFunctionStub(object):
+ def __init__(self):
+ self.input = ''
+
+ def __call__(self, name, *args, **kwargs):
+ return self.input
+
+
+class SubprocessModuleStub(object):
+ class PopenStub(object):
+ def __init__(self):
+ self.communicate_result = ('', '')
+ self.returncode_result = 0
+
+ def __call__(self, args, **kwargs):
+ return self
+
+ def communicate(self):
+ return self.communicate_result
+
+ @property
+ def returncode(self):
+ return self.returncode_result
+
+ def __init__(self):
+ self.Popen = SubprocessModuleStub.PopenStub()
+ self.PIPE = None
+
+ def call(self, *args, **kwargs):
+ pass
+
+
+class SysModuleStub(object):
+ def __init__(self):
+ self.platform = ''
+
+
+class ThermalThrottleModuleStub(object):
+ class ThermalThrottleStub(object):
+ def __init__(self, adb):
+ pass
+
+ def __init__(self):
+ self.ThermalThrottle = ThermalThrottleModuleStub.ThermalThrottleStub