summaryrefslogtreecommitdiff
path: root/third_party/catapult/devil/devil/utils
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/catapult/devil/devil/utils')
-rw-r--r--third_party/catapult/devil/devil/utils/__init__.py23
-rwxr-xr-xthird_party/catapult/devil/devil/utils/battor_device_mapping.py309
-rw-r--r--third_party/catapult/devil/devil/utils/cmd_helper.py394
-rwxr-xr-xthird_party/catapult/devil/devil/utils/cmd_helper_test.py262
-rw-r--r--third_party/catapult/devil/devil/utils/file_utils.py31
-rwxr-xr-xthird_party/catapult/devil/devil/utils/find_usb_devices.py532
-rwxr-xr-xthird_party/catapult/devil/devil/utils/find_usb_devices_test.py379
-rw-r--r--third_party/catapult/devil/devil/utils/geometry.py75
-rw-r--r--third_party/catapult/devil/devil/utils/geometry_test.py61
-rw-r--r--third_party/catapult/devil/devil/utils/host_utils.py16
-rw-r--r--third_party/catapult/devil/devil/utils/lazy/__init__.py5
-rw-r--r--third_party/catapult/devil/devil/utils/lazy/weak_constant.py29
-rw-r--r--third_party/catapult/devil/devil/utils/lsusb.py174
-rwxr-xr-xthird_party/catapult/devil/devil/utils/lsusb_test.py250
-rwxr-xr-xthird_party/catapult/devil/devil/utils/markdown.py320
-rwxr-xr-xthird_party/catapult/devil/devil/utils/markdown_test.py121
-rw-r--r--third_party/catapult/devil/devil/utils/mock_calls.py180
-rwxr-xr-xthird_party/catapult/devil/devil/utils/mock_calls_test.py173
-rw-r--r--third_party/catapult/devil/devil/utils/parallelizer.py238
-rw-r--r--third_party/catapult/devil/devil/utils/parallelizer_test.py162
-rw-r--r--third_party/catapult/devil/devil/utils/reraiser_thread.py228
-rw-r--r--third_party/catapult/devil/devil/utils/reraiser_thread_unittest.py117
-rwxr-xr-xthird_party/catapult/devil/devil/utils/reset_usb.py111
-rw-r--r--third_party/catapult/devil/devil/utils/run_tests_helper.py44
-rw-r--r--third_party/catapult/devil/devil/utils/signal_handler.py48
-rw-r--r--third_party/catapult/devil/devil/utils/test/data/test_serial_map.json1
-rw-r--r--third_party/catapult/devil/devil/utils/timeout_retry.py175
-rwxr-xr-xthird_party/catapult/devil/devil/utils/timeout_retry_unittest.py79
-rwxr-xr-xthird_party/catapult/devil/devil/utils/update_mapping.py47
-rw-r--r--third_party/catapult/devil/devil/utils/usb_hubs.py165
-rw-r--r--third_party/catapult/devil/devil/utils/watchdog_timer.py47
-rw-r--r--third_party/catapult/devil/devil/utils/zip_utils.py33
32 files changed, 4829 insertions, 0 deletions
diff --git a/third_party/catapult/devil/devil/utils/__init__.py b/third_party/catapult/devil/devil/utils/__init__.py
new file mode 100644
index 0000000000..ff84988dbd
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/__init__.py
@@ -0,0 +1,23 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import os
+import sys
+
+def _JoinPath(*path_parts):
+ return os.path.abspath(os.path.join(*path_parts))
+
+
+def _AddDirToPythonPath(*path_parts):
+ path = _JoinPath(*path_parts)
+ if os.path.isdir(path) and path not in sys.path:
+ # Some call sites that use Telemetry assume that sys.path[0] is the
+ # directory containing the script, so we add these extra paths to right
+ # after sys.path[0].
+ sys.path.insert(1, path)
+
+_CATAPULT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)),
+ os.path.pardir, os.path.pardir, os.path.pardir)
+
+_AddDirToPythonPath(_CATAPULT_DIR, 'common', 'battor')
diff --git a/third_party/catapult/devil/devil/utils/battor_device_mapping.py b/third_party/catapult/devil/devil/utils/battor_device_mapping.py
new file mode 100755
index 0000000000..8cabb8304e
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/battor_device_mapping.py
@@ -0,0 +1,309 @@
+#!/usr/bin/python
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+
+'''
+This script provides tools to map BattOrs to phones.
+
+Phones are identified by the following string:
+
+"Phone serial number" - Serial number of the phone. This can be
+obtained via 'adb devices' or 'usb-devices', and is not expected
+to change for a given phone.
+
+BattOrs are identified by the following two strings:
+
+"BattOr serial number" - Serial number of the BattOr. This can be
+obtained via 'usb-devices', and is not expected to change for
+a given BattOr.
+
+"BattOr path" - The path of the form '/dev/ttyUSB*' that is used
+to communicate with the BattOr (the battor_agent binary takes
+this BattOr path as a parameter). The BattOr path is frequently
+reassigned by the OS, most often when the device is disconnected
+and then reconnected. Thus, the BattOr path cannot be expected
+to be stable.
+
+In a typical application, the user will require the BattOr path
+for the BattOr that is plugged into a given phone. For instance,
+the user will be running tracing on a particular phone, and will
+need to know which BattOr path to use to communicate with the BattOr
+to get the corresponding power trace.
+
+Getting this mapping requires two steps: (1) determining the
+mapping between phone serial numbers and BattOr serial numbers, and
+(2) getting the BattOr path corresponding to a given BattOr serial
+number.
+
+For step (1), we generate a JSON file giving this mapping. This
+JSON file consists of a list of items of the following form:
+[{'phone': <phone serial 1>, 'battor': <battor serial 1>},
+{'phone': <phone serial 2>, 'battor': <battor serial 2>}, ...]
+
+The default way to generate this JSON file is using the function
+GenerateSerialMapFile, which generates a mapping based on assuming
+that the system has two identical USB hubs connected to it, and
+the phone plugged into physical port number 1 on one hub corresponds
+to the BattOr plugged into physical port number 1 on the other hub,
+and similarly with physical port numbers 2, 3, etc. This generates
+the map file based on the structure at the time GenerateSerialMapFile called.
+Note that after the map file is generated, port numbers are no longer used;
+the user could move around the devices in the ports without affecting
+which phone goes with which BattOr. (Thus, if the user wanted to update the
+mapping to match the new port connections, the user would have to
+re-generate this file.)
+
+The script update_mapping.py will do this updating from the command line.
+
+If the user wanted to specify a custom mapping, the user could instead
+create the JSON file manually. (In this case, hubs would not be necessary
+and the physical ports connected would be irrelevant.)
+
+Step (2) is conducted through the function GetBattOrPathFromPhoneSerial,
+which takes a serial number mapping generated via step (1) and a phone
+serial number, then gets the corresponding BattOr serial number from the
+map and determines its BattOr path (e.g. /dev/ttyUSB0). Since BattOr paths
+can change if devices are connected and disconnected (even if connected
+or disconnected via the same port) this function should be called to
+determine the BattOr path every time before connecting to the BattOr.
+
+Note that if there is only one BattOr connected to the system, then
+GetBattOrPathFromPhoneSerial will always return that BattOr and will ignore
+the mapping file. Thus, if the user never has more than one BattOr connected
+to the system, the user will not need to generate mapping files.
+'''
+
+
+import json
+import collections
+
+from battor import battor_error
+from devil.utils import find_usb_devices
+from devil.utils import usb_hubs
+
+
+def GetBattOrList(device_tree_map):
+ return [x for x in find_usb_devices.GetTTYList()
+ if IsBattOr(x, device_tree_map)]
+
+
+def IsBattOr(tty_string, device_tree_map):
+ (bus, device) = find_usb_devices.GetBusDeviceFromTTY(tty_string)
+ node = device_tree_map[bus].FindDeviceNumber(device)
+ return '0403:6001' in node.desc
+
+
+def GetBattOrSerialNumbers(device_tree_map):
+ for x in find_usb_devices.GetTTYList():
+ if IsBattOr(x, device_tree_map):
+ (bus, device) = find_usb_devices.GetBusDeviceFromTTY(x)
+ devnode = device_tree_map[bus].FindDeviceNumber(device)
+ yield devnode.serial
+
+
+def ReadSerialMapFile(filename):
+ """Reads JSON file giving phone-to-battor serial number map.
+
+ Parses a JSON file consisting of a list of items of the following form:
+ [{'phone': <phone serial 1>, 'battor': <battor serial 1>},
+ {'phone': <phone serial 2>, 'battor': <battor serial 2>}, ...]
+
+ indicating which phone serial numbers should be matched with
+ which BattOr serial numbers. Returns dictionary of the form:
+
+ {<phone serial 1>: <BattOr serial 1>,
+ <phone serial 2>: <BattOr serial 2>}
+
+ Args:
+ filename: Name of file to read.
+ """
+ result = {}
+ with open(filename, 'r') as infile:
+ in_dict = json.load(infile)
+ for x in in_dict:
+ result[x['phone']] = x['battor']
+ return result
+
+def WriteSerialMapFile(filename, serial_map):
+ """Writes a map of phone serial numbers to BattOr serial numbers to file.
+
+ Writes a JSON file consisting of a list of items of the following form:
+ [{'phone': <phone serial 1>, 'battor': <battor serial 1>},
+ {'phone': <phone serial 2>, 'battor': <battor serial 2>}, ...]
+
+ indicating which phone serial numbers should be matched with
+ which BattOr serial numbers. Mapping is based on the physical port numbers
+ of the hubs that the BattOrs and phones are connected to.
+
+ Args:
+ filename: Name of file to write.
+ serial_map: Serial map {phone: battor}
+ """
+ result = []
+ for (phone, battor) in serial_map.iteritems():
+ result.append({'phone': phone, 'battor': battor})
+ with open(filename, 'w') as outfile:
+ json.dump(result, outfile)
+
+def GenerateSerialMap(hub_types=None):
+ """Generates a map of phone serial numbers to BattOr serial numbers.
+
+ Generates a dict of:
+ {<phone serial 1>: <battor serial 1>,
+ <phone serial 2>: <battor serial 2>}
+ indicating which phone serial numbers should be matched with
+ which BattOr serial numbers. Mapping is based on the physical port numbers
+ of the hubs that the BattOrs and phones are connected to.
+
+ Args:
+ hub_types: List of hub types to check for. If not specified, checks
+ for all defined hub types. (see usb_hubs.py for details)
+ """
+ if hub_types:
+ hub_types = [usb_hubs.GetHubType(x) for x in hub_types]
+ else:
+ hub_types = usb_hubs.ALL_HUBS
+
+ devtree = find_usb_devices.GetBusNumberToDeviceTreeMap()
+
+ # List of serial numbers in the system that represent BattOrs.
+ battor_serials = list(GetBattOrSerialNumbers(devtree))
+
+ # If there's only one BattOr in the system, then a serial number ma
+ # is not necessary.
+ if len(battor_serials) == 1:
+ return {}
+
+ # List of dictionaries, one for each hub, that maps the physical
+ # port number to the serial number of that hub. For instance, in a 2
+ # hub system, this could return [{1:'ab', 2:'cd'}, {1:'jkl', 2:'xyz'}]
+ # where 'ab' and 'cd' are the phone serial numbers and 'jkl' and 'xyz'
+ # are the BattOr serial numbers.
+ port_to_serial = find_usb_devices.GetAllPhysicalPortToSerialMaps(
+ hub_types, device_tree_map=devtree)
+
+ class serials(object):
+ def __init__(self):
+ self.phone = None
+ self.battor = None
+
+ # Map of {physical port number: [phone serial #, BattOr serial #]. This
+ # map is populated by executing the code below. For instance, in the above
+ # example, after the code below is executed, port_to_devices would equal
+ # {1: ['ab', 'jkl'], 2: ['cd', 'xyz']}
+ port_to_devices = collections.defaultdict(serials)
+ for hub in port_to_serial:
+ for (port, serial) in hub.iteritems():
+ if serial in battor_serials:
+ if port_to_devices[port].battor is not None:
+ raise battor_error.BattOrError('Multiple BattOrs on same port number')
+ else:
+ port_to_devices[port].battor = serial
+ else:
+ if port_to_devices[port].phone is not None:
+ raise battor_error.BattOrError('Multiple phones on same port number')
+ else:
+ port_to_devices[port].phone = serial
+
+ # Turn the port_to_devices map into a map of the form
+ # {phone serial number: BattOr serial number}.
+ result = {}
+ for pair in port_to_devices.values():
+ if pair.phone is None:
+ continue
+ if pair.battor is None:
+ raise battor_error.BattOrError(
+ 'Phone detected with no corresponding BattOr')
+ result[pair.phone] = pair.battor
+ return result
+
+def GenerateSerialMapFile(filename, hub_types=None):
+ """Generates a serial map file and writes it."""
+ WriteSerialMapFile(filename, GenerateSerialMap(hub_types))
+
+def _PhoneToPathMap(serial, serial_map, devtree):
+ """Maps phone serial number to TTY path, assuming serial map is provided."""
+ try:
+ battor_serial = serial_map[serial]
+ except KeyError:
+ raise battor_error.BattOrError('Serial number not found in serial map.')
+ for tree in devtree.values():
+ for node in tree.AllNodes():
+ if isinstance(node, find_usb_devices.USBDeviceNode):
+ if node.serial == battor_serial:
+ bus_device_to_tty = find_usb_devices.GetBusDeviceToTTYMap()
+ bus_device = (node.bus_num, node.device_num)
+ try:
+ return bus_device_to_tty[bus_device]
+ except KeyError:
+ raise battor_error.BattOrError(
+ 'Device with given serial number not a BattOr '
+ '(does not have TTY path)')
+
+
+def GetBattOrPathFromPhoneSerial(serial, serial_map=None,
+ serial_map_file=None):
+ """Gets the TTY path (e.g. '/dev/ttyUSB0') to communicate with the BattOr.
+
+ (1) If serial_map is given, it is treated as a dictionary mapping
+ phone serial numbers to BattOr serial numbers. This function will get the
+ TTY path for the given BattOr serial number.
+
+ (2) If serial_map_file is given, it is treated as the name of a
+ phone-to-BattOr mapping file (generated with GenerateSerialMapFile)
+ and this will be loaded and used as the dict to map port numbers to
+ BattOr serial numbers.
+
+ You can only give one of serial_map and serial_map_file.
+
+ Args:
+ serial: Serial number of phone connected on the same physical port that
+ the BattOr is connected to.
+ serial_map: Map of phone serial numbers to BattOr serial numbers, given
+ as a dictionary.
+ serial_map_file: Map of phone serial numbers to BattOr serial numbers,
+ given as a file.
+ hub_types: List of hub types to check for. Used only if serial_map_file
+ is None.
+
+ Returns:
+ Device string used to communicate with device.
+
+ Raises:
+ ValueError: If serial number is not given.
+ BattOrError: If BattOr not found or unexpected USB topology.
+ """
+ # If there's only one BattOr connected to the system, just use that one.
+ # This allows for use on, e.g., a developer's workstation with no hubs.
+ devtree = find_usb_devices.GetBusNumberToDeviceTreeMap()
+ all_battors = GetBattOrList(devtree)
+ if len(all_battors) == 1:
+ return '/dev/' + all_battors[0]
+
+ if not serial:
+ raise battor_error.BattOrError(
+ 'Two or more BattOrs connected, no serial provided')
+
+ if serial_map and serial_map_file:
+ raise ValueError('Cannot specify both serial_map and serial_map_file')
+
+ if serial_map_file:
+ serial_map = ReadSerialMapFile(serial_map_file)
+
+ tty_string = _PhoneToPathMap(serial, serial_map, devtree)
+
+ if not tty_string:
+ raise battor_error.BattOrError(
+ 'No device with given serial number detected.')
+
+ if IsBattOr(tty_string, devtree):
+ return '/dev/' + tty_string
+ else:
+ raise battor_error.BattOrError(
+ 'Device with given serial number is not a BattOr.')
+
+if __name__ == '__main__':
+ # Main function for testing purposes
+ print GenerateSerialMap()
diff --git a/third_party/catapult/devil/devil/utils/cmd_helper.py b/third_party/catapult/devil/devil/utils/cmd_helper.py
new file mode 100644
index 0000000000..06c105fcc5
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/cmd_helper.py
@@ -0,0 +1,394 @@
+# Copyright (c) 2012 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""A wrapper for subprocess to make calling shell commands easier."""
+
+import logging
+import os
+import pipes
+import select
+import signal
+import string
+import StringIO
+import subprocess
+import sys
+import time
+
+# fcntl is not available on Windows.
+try:
+ import fcntl
+except ImportError:
+ fcntl = None
+
+logger = logging.getLogger(__name__)
+
+_SafeShellChars = frozenset(string.ascii_letters + string.digits + '@%_-+=:,./')
+
+
+def SingleQuote(s):
+ """Return an shell-escaped version of the string using single quotes.
+
+ Reliably quote a string which may contain unsafe characters (e.g. space,
+ quote, or other special characters such as '$').
+
+ The returned value can be used in a shell command line as one token that gets
+ to be interpreted literally.
+
+ Args:
+ s: The string to quote.
+
+ Return:
+ The string quoted using single quotes.
+ """
+ return pipes.quote(s)
+
+
+def DoubleQuote(s):
+ """Return an shell-escaped version of the string using double quotes.
+
+ Reliably quote a string which may contain unsafe characters (e.g. space
+ or quote characters), while retaining some shell features such as variable
+ interpolation.
+
+ The returned value can be used in a shell command line as one token that gets
+ to be further interpreted by the shell.
+
+ The set of characters that retain their special meaning may depend on the
+ shell implementation. This set usually includes: '$', '`', '\', '!', '*',
+ and '@'.
+
+ Args:
+ s: The string to quote.
+
+ Return:
+ The string quoted using double quotes.
+ """
+ if not s:
+ return '""'
+ elif all(c in _SafeShellChars for c in s):
+ return s
+ else:
+ return '"' + s.replace('"', '\\"') + '"'
+
+
+def ShrinkToSnippet(cmd_parts, var_name, var_value):
+ """Constructs a shell snippet for a command using a variable to shrink it.
+
+ Takes into account all quoting that needs to happen.
+
+ Args:
+ cmd_parts: A list of command arguments.
+ var_name: The variable that holds var_value.
+ var_value: The string to replace in cmd_parts with $var_name
+
+ Returns:
+ A shell snippet that does not include setting the variable.
+ """
+ def shrink(value):
+ parts = (x and SingleQuote(x) for x in value.split(var_value))
+ with_substitutions = ('"$%s"' % var_name).join(parts)
+ return with_substitutions or "''"
+
+ return ' '.join(shrink(part) for part in cmd_parts)
+
+
+def Popen(args, stdout=None, stderr=None, shell=None, cwd=None, env=None):
+ # preexec_fn isn't supported on windows.
+ if sys.platform == 'win32':
+ preexec_fn = None
+ else:
+ preexec_fn = lambda: signal.signal(signal.SIGPIPE, signal.SIG_DFL)
+
+ return subprocess.Popen(
+ args=args, cwd=cwd, stdout=stdout, stderr=stderr,
+ shell=shell, close_fds=True, env=env, preexec_fn=preexec_fn)
+
+
+def Call(args, stdout=None, stderr=None, shell=None, cwd=None, env=None):
+ pipe = Popen(args, stdout=stdout, stderr=stderr, shell=shell, cwd=cwd,
+ env=env)
+ pipe.communicate()
+ return pipe.wait()
+
+
+def RunCmd(args, cwd=None):
+ """Opens a subprocess to execute a program and returns its return value.
+
+ Args:
+ args: A string or a sequence of program arguments. The program to execute is
+ the string or the first item in the args sequence.
+ cwd: If not None, the subprocess's current directory will be changed to
+ |cwd| before it's executed.
+
+ Returns:
+ Return code from the command execution.
+ """
+ logger.info(str(args) + ' ' + (cwd or ''))
+ return Call(args, cwd=cwd)
+
+
+def GetCmdOutput(args, cwd=None, shell=False):
+ """Open a subprocess to execute a program and returns its output.
+
+ Args:
+ args: A string or a sequence of program arguments. The program to execute is
+ the string or the first item in the args sequence.
+ cwd: If not None, the subprocess's current directory will be changed to
+ |cwd| before it's executed.
+ shell: Whether to execute args as a shell command.
+
+ Returns:
+ Captures and returns the command's stdout.
+ Prints the command's stderr to logger (which defaults to stdout).
+ """
+ (_, output) = GetCmdStatusAndOutput(args, cwd, shell)
+ return output
+
+
+def _ValidateAndLogCommand(args, cwd, shell):
+ if isinstance(args, basestring):
+ if not shell:
+ raise Exception('string args must be run with shell=True')
+ else:
+ if shell:
+ raise Exception('array args must be run with shell=False')
+ args = ' '.join(SingleQuote(c) for c in args)
+ if cwd is None:
+ cwd = ''
+ else:
+ cwd = ':' + cwd
+ logger.info('[host]%s> %s', cwd, args)
+ return args
+
+
+def GetCmdStatusAndOutput(args, cwd=None, shell=False):
+ """Executes a subprocess and returns its exit code and output.
+
+ Args:
+ args: A string or a sequence of program arguments. The program to execute is
+ the string or the first item in the args sequence.
+ cwd: If not None, the subprocess's current directory will be changed to
+ |cwd| before it's executed.
+ shell: Whether to execute args as a shell command. Must be True if args
+ is a string and False if args is a sequence.
+
+ Returns:
+ The 2-tuple (exit code, output).
+ """
+ status, stdout, stderr = GetCmdStatusOutputAndError(
+ args, cwd=cwd, shell=shell)
+
+ if stderr:
+ logger.critical('STDERR: %s', stderr)
+ logger.debug('STDOUT: %s%s', stdout[:4096].rstrip(),
+ '<truncated>' if len(stdout) > 4096 else '')
+ return (status, stdout)
+
+
+def GetCmdStatusOutputAndError(args, cwd=None, shell=False):
+ """Executes a subprocess and returns its exit code, output, and errors.
+
+ Args:
+ args: A string or a sequence of program arguments. The program to execute is
+ the string or the first item in the args sequence.
+ cwd: If not None, the subprocess's current directory will be changed to
+ |cwd| before it's executed.
+ shell: Whether to execute args as a shell command. Must be True if args
+ is a string and False if args is a sequence.
+
+ Returns:
+ The 2-tuple (exit code, output).
+ """
+ _ValidateAndLogCommand(args, cwd, shell)
+ pipe = Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ shell=shell, cwd=cwd)
+ stdout, stderr = pipe.communicate()
+ return (pipe.returncode, stdout, stderr)
+
+
+class TimeoutError(Exception):
+ """Module-specific timeout exception."""
+
+ def __init__(self, output=None):
+ super(TimeoutError, self).__init__()
+ self._output = output
+
+ @property
+ def output(self):
+ return self._output
+
+
+def _IterProcessStdout(process, iter_timeout=None, timeout=None,
+ buffer_size=4096, poll_interval=1):
+ """Iterate over a process's stdout.
+
+ This is intentionally not public.
+
+ Args:
+ process: The process in question.
+ iter_timeout: An optional length of time, in seconds, to wait in
+ between each iteration. If no output is received in the given
+ time, this generator will yield None.
+ timeout: An optional length of time, in seconds, during which
+ the process must finish. If it fails to do so, a TimeoutError
+ will be raised.
+ buffer_size: The maximum number of bytes to read (and thus yield) at once.
+ poll_interval: The length of time to wait in calls to `select.select`.
+ If iter_timeout is set, the remaining length of time in the iteration
+ may take precedence.
+ Raises:
+ TimeoutError: if timeout is set and the process does not complete.
+ Yields:
+ basestrings of data or None.
+ """
+
+ assert fcntl, 'fcntl module is required'
+ try:
+ # Enable non-blocking reads from the child's stdout.
+ child_fd = process.stdout.fileno()
+ fl = fcntl.fcntl(child_fd, fcntl.F_GETFL)
+ fcntl.fcntl(child_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
+
+ end_time = (time.time() + timeout) if timeout else None
+ iter_end_time = (time.time() + iter_timeout) if iter_timeout else None
+
+ while True:
+ if end_time and time.time() > end_time:
+ raise TimeoutError()
+ if iter_end_time and time.time() > iter_end_time:
+ yield None
+ iter_end_time = time.time() + iter_timeout
+
+ if iter_end_time:
+ iter_aware_poll_interval = min(
+ poll_interval,
+ max(0, iter_end_time - time.time()))
+ else:
+ iter_aware_poll_interval = poll_interval
+
+ read_fds, _, _ = select.select(
+ [child_fd], [], [], iter_aware_poll_interval)
+ if child_fd in read_fds:
+ data = os.read(child_fd, buffer_size)
+ if not data:
+ break
+ yield data
+ if process.poll() is not None:
+ break
+ finally:
+ try:
+ if process.returncode is None:
+ # Make sure the process doesn't stick around if we fail with an
+ # exception.
+ process.kill()
+ except OSError:
+ pass
+ process.wait()
+
+
+def GetCmdStatusAndOutputWithTimeout(args, timeout, cwd=None, shell=False,
+ logfile=None):
+ """Executes a subprocess with a timeout.
+
+ Args:
+ args: List of arguments to the program, the program to execute is the first
+ element.
+ timeout: the timeout in seconds or None to wait forever.
+ cwd: If not None, the subprocess's current directory will be changed to
+ |cwd| before it's executed.
+ shell: Whether to execute args as a shell command. Must be True if args
+ is a string and False if args is a sequence.
+ logfile: Optional file-like object that will receive output from the
+ command as it is running.
+
+ Returns:
+ The 2-tuple (exit code, output).
+ Raises:
+ TimeoutError on timeout.
+ """
+ _ValidateAndLogCommand(args, cwd, shell)
+ output = StringIO.StringIO()
+ process = Popen(args, cwd=cwd, shell=shell, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ try:
+ for data in _IterProcessStdout(process, timeout=timeout):
+ if logfile:
+ logfile.write(data)
+ output.write(data)
+ except TimeoutError:
+ raise TimeoutError(output.getvalue())
+
+ str_output = output.getvalue()
+ logger.debug('STDOUT+STDERR: %s%s', str_output[:4096].rstrip(),
+ '<truncated>' if len(str_output) > 4096 else '')
+ return process.returncode, str_output
+
+
+def IterCmdOutputLines(args, iter_timeout=None, timeout=None, cwd=None,
+ shell=False, check_status=True):
+ """Executes a subprocess and continuously yields lines from its output.
+
+ Args:
+ args: List of arguments to the program, the program to execute is the first
+ element.
+ iter_timeout: Timeout for each iteration, in seconds.
+ timeout: Timeout for the entire command, in seconds.
+ cwd: If not None, the subprocess's current directory will be changed to
+ |cwd| before it's executed.
+ shell: Whether to execute args as a shell command. Must be True if args
+ is a string and False if args is a sequence.
+ check_status: A boolean indicating whether to check the exit status of the
+ process after all output has been read.
+ Yields:
+ The output of the subprocess, line by line.
+
+ Raises:
+ CalledProcessError if check_status is True and the process exited with a
+ non-zero exit status.
+ """
+ cmd = _ValidateAndLogCommand(args, cwd, shell)
+ process = Popen(args, cwd=cwd, shell=shell, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ return _IterCmdOutputLines(
+ process, cmd, iter_timeout=iter_timeout, timeout=timeout,
+ check_status=check_status)
+
+def _IterCmdOutputLines(process, cmd, iter_timeout=None, timeout=None,
+ check_status=True):
+ buffer_output = ''
+
+ iter_end = None
+ cur_iter_timeout = None
+ if iter_timeout:
+ iter_end = time.time() + iter_timeout
+ cur_iter_timeout = iter_timeout
+
+ for data in _IterProcessStdout(process, iter_timeout=cur_iter_timeout,
+ timeout=timeout):
+ if iter_timeout:
+ # Check whether the current iteration has timed out.
+ cur_iter_timeout = iter_end - time.time()
+ if data is None or cur_iter_timeout < 0:
+ yield None
+ iter_end = time.time() + iter_timeout
+ continue
+ else:
+ assert data is not None, (
+ 'Iteration received no data despite no iter_timeout being set. '
+ 'cmd: %s' % cmd)
+
+ # Construct lines to yield from raw data.
+ buffer_output += data
+ has_incomplete_line = buffer_output[-1] not in '\r\n'
+ lines = buffer_output.splitlines()
+ buffer_output = lines.pop() if has_incomplete_line else ''
+ for line in lines:
+ yield line
+ if iter_timeout:
+ iter_end = time.time() + iter_timeout
+
+ if buffer_output:
+ yield buffer_output
+ if check_status and process.returncode:
+ raise subprocess.CalledProcessError(process.returncode, cmd)
diff --git a/third_party/catapult/devil/devil/utils/cmd_helper_test.py b/third_party/catapult/devil/devil/utils/cmd_helper_test.py
new file mode 100755
index 0000000000..783c4137c8
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/cmd_helper_test.py
@@ -0,0 +1,262 @@
+#!/usr/bin/env python
+# Copyright 2013 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Tests for the cmd_helper module."""
+
+import unittest
+import subprocess
+import time
+
+from devil import devil_env
+from devil.utils import cmd_helper
+
+with devil_env.SysPath(devil_env.PYMOCK_PATH):
+ import mock # pylint: disable=import-error
+
+
+class CmdHelperSingleQuoteTest(unittest.TestCase):
+
+ def testSingleQuote_basic(self):
+ self.assertEquals('hello',
+ cmd_helper.SingleQuote('hello'))
+
+ def testSingleQuote_withSpaces(self):
+ self.assertEquals("'hello world'",
+ cmd_helper.SingleQuote('hello world'))
+
+ def testSingleQuote_withUnsafeChars(self):
+ self.assertEquals("""'hello'"'"'; rm -rf /'""",
+ cmd_helper.SingleQuote("hello'; rm -rf /"))
+
+ def testSingleQuote_dontExpand(self):
+ test_string = 'hello $TEST_VAR'
+ cmd = 'TEST_VAR=world; echo %s' % cmd_helper.SingleQuote(test_string)
+ self.assertEquals(test_string,
+ cmd_helper.GetCmdOutput(cmd, shell=True).rstrip())
+
+
+class CmdHelperDoubleQuoteTest(unittest.TestCase):
+
+ def testDoubleQuote_basic(self):
+ self.assertEquals('hello',
+ cmd_helper.DoubleQuote('hello'))
+
+ def testDoubleQuote_withSpaces(self):
+ self.assertEquals('"hello world"',
+ cmd_helper.DoubleQuote('hello world'))
+
+ def testDoubleQuote_withUnsafeChars(self):
+ self.assertEquals('''"hello\\"; rm -rf /"''',
+ cmd_helper.DoubleQuote('hello"; rm -rf /'))
+
+ def testSingleQuote_doExpand(self):
+ test_string = 'hello $TEST_VAR'
+ cmd = 'TEST_VAR=world; echo %s' % cmd_helper.DoubleQuote(test_string)
+ self.assertEquals('hello world',
+ cmd_helper.GetCmdOutput(cmd, shell=True).rstrip())
+
+
+class CmdHelperShinkToSnippetTest(unittest.TestCase):
+
+ def testShrinkToSnippet_noArgs(self):
+ self.assertEquals('foo',
+ cmd_helper.ShrinkToSnippet(['foo'], 'a', 'bar'))
+ self.assertEquals("'foo foo'",
+ cmd_helper.ShrinkToSnippet(['foo foo'], 'a', 'bar'))
+ self.assertEquals('"$a"\' bar\'',
+ cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'foo'))
+ self.assertEquals('\'foo \'"$a"',
+ cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'bar'))
+ self.assertEquals('foo"$a"',
+ cmd_helper.ShrinkToSnippet(['foobar'], 'a', 'bar'))
+
+ def testShrinkToSnippet_singleArg(self):
+ self.assertEquals("foo ''",
+ cmd_helper.ShrinkToSnippet(['foo', ''], 'a', 'bar'))
+ self.assertEquals("foo foo",
+ cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'bar'))
+ self.assertEquals('"$a" "$a"',
+ cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'foo'))
+ self.assertEquals('foo "$a""$a"',
+ cmd_helper.ShrinkToSnippet(['foo', 'barbar'], 'a', 'bar'))
+ self.assertEquals('foo "$a"\' \'"$a"',
+ cmd_helper.ShrinkToSnippet(['foo', 'bar bar'], 'a', 'bar'))
+ self.assertEquals('foo "$a""$a"\' \'',
+ cmd_helper.ShrinkToSnippet(['foo', 'barbar '], 'a', 'bar'))
+ self.assertEquals('foo \' \'"$a""$a"\' \'',
+ cmd_helper.ShrinkToSnippet(['foo', ' barbar '], 'a', 'bar'))
+
+
+_DEFAULT = 'DEFAULT'
+
+
+class _ProcessOutputEvent(object):
+
+ def __init__(self, select_fds=_DEFAULT, read_contents=None, ts=_DEFAULT):
+ self.select_fds = select_fds
+ self.read_contents = read_contents
+ self.ts = ts
+
+
+class _MockProcess(object):
+
+ def __init__(self, output_sequence=None, return_value=0):
+
+ # Arbitrary.
+ fake_stdout_fileno = 25
+
+ self.mock_proc = mock.MagicMock(spec=subprocess.Popen)
+ self.mock_proc.stdout = mock.MagicMock()
+ self.mock_proc.stdout.fileno = mock.MagicMock(
+ return_value=fake_stdout_fileno)
+ self.mock_proc.returncode = None
+
+ self._return_value = return_value
+
+ # This links the behavior of os.read, select.select, time.time, and
+ # <process>.poll. The output sequence can be thought of as a list of
+ # return values for select.select with corresponding return values for
+ # the other calls at any time between that select call and the following
+ # one. We iterate through the sequence only on calls to select.select.
+ #
+ # os.read is a special case, though, where we only return a given chunk
+ # of data *once* after a given call to select.
+
+ if not output_sequence:
+ output_sequence = []
+
+ # Use an leading element to make the iteration logic work.
+ initial_seq_element = _ProcessOutputEvent(
+ _DEFAULT, '',
+ output_sequence[0].ts if output_sequence else _DEFAULT)
+ output_sequence.insert(0, initial_seq_element)
+
+ for o in output_sequence:
+ if o.select_fds == _DEFAULT:
+ if o.read_contents is None:
+ o.select_fds = []
+ else:
+ o.select_fds = [fake_stdout_fileno]
+ if o.ts == _DEFAULT:
+ o.ts = time.time()
+ self._output_sequence = output_sequence
+
+ self._output_seq_index = 0
+ self._read_flags = [False] * len(output_sequence)
+
+ def read_side_effect(*_args, **_kwargs):
+ if self._read_flags[self._output_seq_index]:
+ return None
+ self._read_flags[self._output_seq_index] = True
+ return self._output_sequence[self._output_seq_index].read_contents
+
+ def select_side_effect(*_args, **_kwargs):
+ if self._output_seq_index is None:
+ self._output_seq_index = 0
+ else:
+ self._output_seq_index += 1
+ return (self._output_sequence[self._output_seq_index].select_fds,
+ None, None)
+
+ def time_side_effect(*_args, **_kwargs):
+ return self._output_sequence[self._output_seq_index].ts
+
+ def poll_side_effect(*_args, **_kwargs):
+ if self._output_seq_index >= len(self._output_sequence) - 1:
+ self.mock_proc.returncode = self._return_value
+ return self.mock_proc.returncode
+
+ mock_read = mock.MagicMock(side_effect=read_side_effect)
+ mock_select = mock.MagicMock(side_effect=select_side_effect)
+ mock_time = mock.MagicMock(side_effect=time_side_effect)
+ self.mock_proc.poll = mock.MagicMock(side_effect=poll_side_effect)
+
+ # Set up but *do not start* the mocks.
+ self._mocks = [
+ mock.patch('fcntl.fcntl'),
+ mock.patch('os.read', new=mock_read),
+ mock.patch('select.select', new=mock_select),
+ mock.patch('time.time', new=mock_time),
+ ]
+
+ def __enter__(self):
+ for m in self._mocks:
+ m.__enter__()
+ return self.mock_proc
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ for m in reversed(self._mocks):
+ m.__exit__(exc_type, exc_val, exc_tb)
+
+
+class CmdHelperIterCmdOutputLinesTest(unittest.TestCase):
+ """Test IterCmdOutputLines with some calls to the unix 'seq' command."""
+
+ # This calls _IterCmdOutputLines rather than IterCmdOutputLines s.t. it
+ # can mock the process.
+ # pylint: disable=protected-access
+
+ _SIMPLE_OUTPUT_SEQUENCE = [
+ _ProcessOutputEvent(read_contents='1\n2\n'),
+ ]
+
+ def testIterCmdOutputLines_success(self):
+ with _MockProcess(
+ output_sequence=self._SIMPLE_OUTPUT_SEQUENCE) as mock_proc:
+ for num, line in enumerate(
+ cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
+ self.assertEquals(num, int(line))
+
+ def testIterCmdOutputLines_exitStatusFail(self):
+ with self.assertRaises(subprocess.CalledProcessError):
+ with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
+ return_value=1) as mock_proc:
+ for num, line in enumerate(
+ cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
+ self.assertEquals(num, int(line))
+ # after reading all the output we get an exit status of 1
+
+ def testIterCmdOutputLines_exitStatusIgnored(self):
+ with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
+ return_value=1) as mock_proc:
+ for num, line in enumerate(
+ cmd_helper._IterCmdOutputLines(
+ mock_proc, 'mock_proc', check_status=False),
+ 1):
+ self.assertEquals(num, int(line))
+
+ def testIterCmdOutputLines_exitStatusSkipped(self):
+ with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
+ return_value=1) as mock_proc:
+ for num, line in enumerate(
+ cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
+ self.assertEquals(num, int(line))
+ # no exception will be raised because we don't attempt to read past
+ # the end of the output and, thus, the status never gets checked
+ if num == 2:
+ break
+
+ def testIterCmdOutputLines_delay(self):
+ output_sequence = [
+ _ProcessOutputEvent(read_contents='1\n2\n', ts=1),
+ _ProcessOutputEvent(read_contents=None, ts=2),
+ _ProcessOutputEvent(read_contents='Awake', ts=10),
+ ]
+ with _MockProcess(output_sequence=output_sequence) as mock_proc:
+ for num, line in enumerate(
+ cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc',
+ iter_timeout=5), 1):
+ if num <= 2:
+ self.assertEquals(num, int(line))
+ elif num == 3:
+ self.assertEquals(None, line)
+ elif num == 4:
+ self.assertEquals('Awake', line)
+ else:
+ self.fail()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/third_party/catapult/devil/devil/utils/file_utils.py b/third_party/catapult/devil/devil/utils/file_utils.py
new file mode 100644
index 0000000000..dc5a9efc94
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/file_utils.py
@@ -0,0 +1,31 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import os
+
+
+def MergeFiles(dest_file, source_files):
+ """Merge list of files into single destination file.
+
+ Args:
+ dest_file: File to be written to.
+ source_files: List of files to be merged. Will be merged in the order they
+ appear in the list.
+ """
+ if not os.path.exists(os.path.dirname(dest_file)):
+ os.makedirs(os.path.dirname(dest_file))
+ try:
+ with open(dest_file, 'w') as dest_f:
+ for source_file in source_files:
+ with open(source_file, 'r') as source_f:
+ dest_f.write(source_f.read())
+ except Exception as e: # pylint: disable=broad-except
+ # Something went wrong when creating dest_file. Cleaning up.
+ try:
+ os.remove(dest_file)
+ except OSError:
+ pass
+ raise e
+
+
diff --git a/third_party/catapult/devil/devil/utils/find_usb_devices.py b/third_party/catapult/devil/devil/utils/find_usb_devices.py
new file mode 100755
index 0000000000..0e0f4d5666
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/find_usb_devices.py
@@ -0,0 +1,532 @@
+#!/usr/bin/python
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import re
+import sys
+import argparse
+
+from devil.utils import cmd_helper
+from devil.utils import usb_hubs
+from devil.utils import lsusb
+
+# Note: In the documentation below, "virtual port" refers to the port number
+# as observed by the system (e.g. by usb-devices) and "physical port" refers
+# to the physical numerical label on the physical port e.g. on a USB hub.
+# The mapping between virtual and physical ports is not always the identity
+# (e.g. the port labeled "1" on a USB hub does not always show up as "port 1"
+# when you plug something into it) but, as far as we are aware, the mapping
+# between virtual and physical ports is always the same for a given
+# model of USB hub. When "port number" is referenced without specifying, it
+# means the virtual port number.
+
+
+# Wrapper functions for system commands to get output. These are in wrapper
+# functions so that they can be more easily mocked-out for tests.
+def _GetParsedLSUSBOutput():
+ return lsusb.lsusb()
+
+
+def _GetUSBDevicesOutput():
+ return cmd_helper.GetCmdOutput(['usb-devices'])
+
+
+def _GetTtyUSBInfo(tty_string):
+ cmd = ['udevadm', 'info', '--name=/dev/' + tty_string, '--attribute-walk']
+ return cmd_helper.GetCmdOutput(cmd)
+
+
+def _GetCommList():
+ return cmd_helper.GetCmdOutput('ls /dev', shell=True)
+
+
+def GetTTYList():
+ return [x for x in _GetCommList().splitlines() if 'ttyUSB' in x]
+
+
+# Class to identify nodes in the USB topology. USB topology is organized as
+# a tree.
+class USBNode(object):
+ def __init__(self):
+ self._port_to_node = {}
+
+ @property
+ def desc(self):
+ raise NotImplementedError
+
+ @property
+ def info(self):
+ raise NotImplementedError
+
+ @property
+ def device_num(self):
+ raise NotImplementedError
+
+ @property
+ def bus_num(self):
+ raise NotImplementedError
+
+ def HasPort(self, port):
+ """Determines if this device has a device connected to the given port."""
+ return port in self._port_to_node
+
+ def PortToDevice(self, port):
+ """Gets the device connected to the given port on this device."""
+ return self._port_to_node[port]
+
+ def Display(self, port_chain='', info=False):
+ """Displays information about this node and its descendants.
+
+ Output format is, e.g. 1:3:3:Device 42 (ID 1234:5678 Some Device)
+ meaning that from the bus, if you look at the device connected
+ to port 1, then the device connected to port 3 of that,
+ then the device connected to port 3 of that, you get the device
+ assigned device number 42, which is Some Device. Note that device
+ numbers will be reassigned whenever a connected device is powercycled
+ or reinserted, but port numbers stay the same as long as the device
+ is reinserted back into the same physical port.
+
+ Args:
+ port_chain: [string] Chain of ports from bus to this node (e.g. '2:4:')
+ info: [bool] Whether to display detailed info as well.
+ """
+ raise NotImplementedError
+
+ def AddChild(self, port, device):
+ """Adds child to the device tree.
+
+ Args:
+ port: [int] Port number of the device.
+ device: [USBDeviceNode] Device to add.
+
+ Raises:
+ ValueError: If device already has a child at the given port.
+ """
+ if self.HasPort(port):
+ raise ValueError('Duplicate port number')
+ else:
+ self._port_to_node[port] = device
+
+ def AllNodes(self):
+ """Generator that yields this node and all of its descendants.
+
+ Yields:
+ [USBNode] First this node, then each of its descendants (recursively)
+ """
+ yield self
+ for child_node in self._port_to_node.values():
+ for descendant_node in child_node.AllNodes():
+ yield descendant_node
+
+ def FindDeviceNumber(self, findnum):
+ """Find device with given number in tree
+
+ Searches the portion of the device tree rooted at this node for
+ a device with the given device number.
+
+ Args:
+ findnum: [int] Device number to search for.
+
+ Returns:
+ [USBDeviceNode] Node that is found.
+ """
+ for node in self.AllNodes():
+ if node.device_num == findnum:
+ return node
+ return None
+
+
+class USBDeviceNode(USBNode):
+ def __init__(self, bus_num=0, device_num=0, serial=None, info=None):
+ """Class that represents a device in USB tree.
+
+ Args:
+ bus_num: [int] Bus number that this node is attached to.
+ device_num: [int] Device number of this device (or 0, if this is a bus)
+ serial: [string] Serial number.
+ info: [dict] Map giving detailed device info.
+ """
+ super(USBDeviceNode, self).__init__()
+ self._bus_num = bus_num
+ self._device_num = device_num
+ self._serial = serial
+ self._info = {} if info is None else info
+
+ #override
+ @property
+ def desc(self):
+ return self._info.get('desc')
+
+ #override
+ @property
+ def info(self):
+ return self._info
+
+ #override
+ @property
+ def device_num(self):
+ return self._device_num
+
+ #override
+ @property
+ def bus_num(self):
+ return self._bus_num
+
+ @property
+ def serial(self):
+ return self._serial
+
+ @serial.setter
+ def serial(self, serial):
+ self._serial = serial
+
+ #override
+ def Display(self, port_chain='', info=False):
+ print '%s Device %d (%s)' % (port_chain, self.device_num, self.desc)
+ if info:
+ print self.info
+ for (port, device) in self._port_to_node.iteritems():
+ device.Display('%s%d:' % (port_chain, port), info=info)
+
+
+class USBBusNode(USBNode):
+ def __init__(self, bus_num=0):
+ """Class that represents a node (either a bus or device) in USB tree.
+
+ Args:
+ is_bus: [bool] If true, node is bus; if not, node is device.
+ bus_num: [int] Bus number that this node is attached to.
+ device_num: [int] Device number of this device (or 0, if this is a bus)
+ desc: [string] Short description of device.
+ serial: [string] Serial number.
+ info: [dict] Map giving detailed device info.
+ port_to_dev: [dict(int:USBDeviceNode)]
+ Maps port # to device connected to port.
+ """
+ super(USBBusNode, self).__init__()
+ self._bus_num = bus_num
+
+ #override
+ @property
+ def desc(self):
+ return 'BUS %d' % self._bus_num
+
+ #override
+ @property
+ def info(self):
+ return {}
+
+ #override
+ @property
+ def device_num(self):
+ return -1
+
+ #override
+ @property
+ def bus_num(self):
+ return self._bus_num
+
+ #override
+ def Display(self, port_chain='', info=False):
+ print "=== %s ===" % self.desc
+ for (port, device) in self._port_to_node.iteritems():
+ device.Display('%s%d:' % (port_chain, port), info=info)
+
+
+_T_LINE_REGEX = re.compile(r'T: Bus=(?P<bus>\d{2}) Lev=(?P<lev>\d{2}) '
+ r'Prnt=(?P<prnt>\d{2,3}) Port=(?P<port>\d{2}) '
+ r'Cnt=(?P<cnt>\d{2}) Dev#=(?P<dev>.{3}) .*')
+
+_S_LINE_REGEX = re.compile(r'S: SerialNumber=(?P<serial>.*)')
+_LSUSB_BUS_DEVICE_RE = re.compile(r'^Bus (\d{3}) Device (\d{3}): (.*)')
+
+
+def GetBusNumberToDeviceTreeMap(fast=True):
+ """Gets devices currently attached.
+
+ Args:
+ fast [bool]: whether to do it fast (only get description, not
+ the whole dictionary, from lsusb)
+
+ Returns:
+ map of {bus number: bus object}
+ where the bus object has all the devices attached to it in a tree.
+ """
+ if fast:
+ info_map = {}
+ for line in lsusb.raw_lsusb().splitlines():
+ match = _LSUSB_BUS_DEVICE_RE.match(line)
+ if match:
+ info_map[(int(match.group(1)), int(match.group(2)))] = (
+ {'desc':match.group(3)})
+ else:
+ info_map = {((int(line['bus']), int(line['device']))): line
+ for line in _GetParsedLSUSBOutput()}
+
+
+ tree = {}
+ bus_num = -1
+ for line in _GetUSBDevicesOutput().splitlines():
+ match = _T_LINE_REGEX.match(line)
+ if match:
+ bus_num = int(match.group('bus'))
+ parent_num = int(match.group('prnt'))
+ # usb-devices starts counting ports from 0, so add 1
+ port_num = int(match.group('port')) + 1
+ device_num = int(match.group('dev'))
+
+ # create new bus if necessary
+ if bus_num not in tree:
+ tree[bus_num] = USBBusNode(bus_num=bus_num)
+
+ # create the new device
+ new_device = USBDeviceNode(bus_num=bus_num,
+ device_num=device_num,
+ info=info_map.get((bus_num, device_num),
+ {'desc': 'NOT AVAILABLE'}))
+
+ # add device to bus
+ if parent_num != 0:
+ tree[bus_num].FindDeviceNumber(parent_num).AddChild(
+ port_num, new_device)
+ else:
+ tree[bus_num].AddChild(port_num, new_device)
+
+ match = _S_LINE_REGEX.match(line)
+ if match:
+ if bus_num == -1:
+ raise ValueError('S line appears before T line in input file')
+ # put the serial number in the device
+ tree[bus_num].FindDeviceNumber(device_num).serial = match.group('serial')
+
+ return tree
+
+
+def GetHubsOnBus(bus, hub_types):
+ """Scans for all hubs on a bus of given hub types.
+
+ Args:
+ bus: [USBNode] Bus object.
+ hub_types: [iterable(usb_hubs.HubType)] Possible types of hubs.
+
+ Yields:
+ Sequence of tuples representing (hub, type of hub)
+ """
+ for device in bus.AllNodes():
+ for hub_type in hub_types:
+ if hub_type.IsType(device):
+ yield (device, hub_type)
+
+
+def GetPhysicalPortToNodeMap(hub, hub_type):
+ """Gets physical-port:node mapping for a given hub.
+ Args:
+ hub: [USBNode] Hub to get map for.
+ hub_type: [usb_hubs.HubType] Which type of hub it is.
+
+ Returns:
+ Dict of {physical port: node}
+ """
+ port_device = hub_type.GetPhysicalPortToNodeTuples(hub)
+ return {port: device for (port, device) in port_device}
+
+
+def GetPhysicalPortToBusDeviceMap(hub, hub_type):
+ """Gets physical-port:(bus#, device#) mapping for a given hub.
+ Args:
+ hub: [USBNode] Hub to get map for.
+ hub_type: [usb_hubs.HubType] Which type of hub it is.
+
+ Returns:
+ Dict of {physical port: (bus number, device number)}
+ """
+ port_device = hub_type.GetPhysicalPortToNodeTuples(hub)
+ return {port: (device.bus_num, device.device_num)
+ for (port, device) in port_device}
+
+
+def GetPhysicalPortToSerialMap(hub, hub_type):
+ """Gets physical-port:serial# mapping for a given hub.
+
+ Args:
+ hub: [USBNode] Hub to get map for.
+ hub_type: [usb_hubs.HubType] Which type of hub it is.
+
+ Returns:
+ Dict of {physical port: serial number)}
+ """
+ port_device = hub_type.GetPhysicalPortToNodeTuples(hub)
+ return {port: device.serial
+ for (port, device) in port_device
+ if device.serial}
+
+
+def GetPhysicalPortToTTYMap(device, hub_type):
+ """Gets physical-port:tty-string mapping for a given hub.
+ Args:
+ hub: [USBNode] Hub to get map for.
+ hub_type: [usb_hubs.HubType] Which type of hub it is.
+
+ Returns:
+ Dict of {physical port: tty-string)}
+ """
+ port_device = hub_type.GetPhysicalPortToNodeTuples(device)
+ bus_device_to_tty = GetBusDeviceToTTYMap()
+ return {port: bus_device_to_tty[(device.bus_num, device.device_num)]
+ for (port, device) in port_device
+ if (device.bus_num, device.device_num) in bus_device_to_tty}
+
+
+def CollectHubMaps(hub_types, map_func, device_tree_map=None, fast=False):
+ """Runs a function on all hubs in the system and collects their output.
+
+ Args:
+ hub_types: [usb_hubs.HubType] List of possible hub types.
+ map_func: [string] Function to run on each hub.
+ device_tree: Previously constructed device tree map, if any.
+ fast: Whether to construct device tree fast, if not already provided
+
+ Yields:
+ Sequence of dicts of {physical port: device} where the type of
+ device depends on the ident keyword. Each dict is a separate hub.
+ """
+ if device_tree_map is None:
+ device_tree_map = GetBusNumberToDeviceTreeMap(fast=fast)
+ for bus in device_tree_map.values():
+ for (hub, hub_type) in GetHubsOnBus(bus, hub_types):
+ yield map_func(hub, hub_type)
+
+
+def GetAllPhysicalPortToNodeMaps(hub_types, **kwargs):
+ return CollectHubMaps(hub_types, GetPhysicalPortToNodeMap, **kwargs)
+
+
+def GetAllPhysicalPortToBusDeviceMaps(hub_types, **kwargs):
+ return CollectHubMaps(hub_types, GetPhysicalPortToBusDeviceMap, **kwargs)
+
+
+def GetAllPhysicalPortToSerialMaps(hub_types, **kwargs):
+ return CollectHubMaps(hub_types, GetPhysicalPortToSerialMap, **kwargs)
+
+
+def GetAllPhysicalPortToTTYMaps(hub_types, **kwargs):
+ return CollectHubMaps(hub_types, GetPhysicalPortToTTYMap, **kwargs)
+
+
+_BUS_NUM_REGEX = re.compile(r'.*ATTRS{busnum}=="(\d*)".*')
+_DEVICE_NUM_REGEX = re.compile(r'.*ATTRS{devnum}=="(\d*)".*')
+
+
+def GetBusDeviceFromTTY(tty_string):
+ """Gets bus and device number connected to a ttyUSB port.
+
+ Args:
+ tty_string: [String] Identifier for ttyUSB (e.g. 'ttyUSB0')
+
+ Returns:
+ Tuple (bus, device) giving device connected to that ttyUSB.
+
+ Raises:
+ ValueError: If bus and device information could not be found.
+ """
+ bus_num = None
+ device_num = None
+ # Expected output of GetCmdOutput should be something like:
+ # looking at device /devices/something/.../.../...
+ # KERNELS="ttyUSB0"
+ # SUBSYSTEMS=...
+ # DRIVERS=...
+ # ATTRS{foo}=...
+ # ATTRS{bar}=...
+ # ...
+ for line in _GetTtyUSBInfo(tty_string).splitlines():
+ bus_match = _BUS_NUM_REGEX.match(line)
+ device_match = _DEVICE_NUM_REGEX.match(line)
+ if bus_match and bus_num == None:
+ bus_num = int(bus_match.group(1))
+ if device_match and device_num == None:
+ device_num = int(device_match.group(1))
+ if bus_num is None or device_num is None:
+ raise ValueError('Info not found')
+ return (bus_num, device_num)
+
+
+def GetBusDeviceToTTYMap():
+ """Gets all mappings from (bus, device) to ttyUSB string.
+
+ Gets mapping from (bus, device) to ttyUSB string (e.g. 'ttyUSB0'),
+ for all ttyUSB strings currently active.
+
+ Returns:
+ [dict] Dict that maps (bus, device) to ttyUSB string
+ """
+ result = {}
+ for tty in GetTTYList():
+ result[GetBusDeviceFromTTY(tty)] = tty
+ return result
+
+
+# This dictionary described the mapping between physical and
+# virtual ports on a Plugable 7-Port Hub (model USB2-HUB7BC).
+# Keys are the virtual ports, values are the physical port.
+# The entry 4:{1:4, 2:3, 3:2, 4:1} indicates that virtual port
+# 4 connects to another 'virtual' hub that itself has the
+# virtual-to-physical port mapping {1:4, 2:3, 3:2, 4:1}.
+
+
+def TestUSBTopologyScript():
+ """Test display and hub identification."""
+ # Identification criteria for Plugable 7-Port Hub
+ print '==== USB TOPOLOGY SCRIPT TEST ===='
+
+ # Display devices
+ print '==== DEVICE DISPLAY ===='
+ device_trees = GetBusNumberToDeviceTreeMap()
+ for device_tree in device_trees.values():
+ device_tree.Display()
+ print
+
+ # Display TTY information about devices plugged into hubs.
+ print '==== TTY INFORMATION ===='
+ for port_map in GetAllPhysicalPortToTTYMaps(
+ usb_hubs.ALL_HUBS, device_tree_map=device_trees):
+ print port_map
+ print
+
+ # Display serial number information about devices plugged into hubs.
+ print '==== SERIAL NUMBER INFORMATION ===='
+ for port_map in GetAllPhysicalPortToSerialMaps(
+ usb_hubs.ALL_HUBS, device_tree_map=device_trees):
+ print port_map
+
+
+ return 0
+
+
+def parse_options(argv):
+ """Parses and checks the command-line options.
+
+ Returns:
+ A tuple containing the options structure and a list of categories to
+ be traced.
+ """
+ USAGE = '''./find_usb_devices [--help]
+ This script shows the mapping between USB devices and port numbers.
+ Clients are not intended to call this script from the command line.
+ Clients are intended to call the functions in this script directly.
+ For instance, GetAllPhysicalPortToSerialMaps(...)
+ Running this script with --help will display this message.
+ Running this script without --help will display information about
+ devices attached, TTY mapping, and serial number mapping,
+ for testing purposes. See design document for API documentation.
+ '''
+ parser = argparse.ArgumentParser(usage=USAGE)
+ return parser.parse_args(argv[1:])
+
+def main():
+ parse_options(sys.argv)
+ TestUSBTopologyScript()
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/third_party/catapult/devil/devil/utils/find_usb_devices_test.py b/third_party/catapult/devil/devil/utils/find_usb_devices_test.py
new file mode 100755
index 0000000000..e8b00c85ee
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/find_usb_devices_test.py
@@ -0,0 +1,379 @@
+#!/usr/bin/env python
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# pylint: disable=protected-access
+
+"""
+Unit tests for the contents of find_usb_devices.py.
+
+Device tree for these tests is as follows:
+Bus 001:
+1: Device 011 "foo"
+2: Device 012 "bar"
+3: Device 013 "baz"
+
+Bus 002:
+1: Device 011 "quux"
+2: Device 020 "My Test HUB" #hub 1
+2:1: Device 021 "battor_p7_h1_t0" #physical port 7 on hub 1, on ttyUSB0
+2:3: Device 022 "battor_p5_h1_t1" #physical port 5 on hub 1, on ttyUSB1
+2:4: Device 023 "My Test Internal HUB" #internal section of hub 1
+2:4:2: Device 024 "battor_p3_h1_t2" #physical port 3 on hub 1, on ttyUSB2
+2:4:3: Device 026 "Not a Battery Monitor" #physical port 1 on hub 1, on ttyUSB3
+2:4:4: Device 025 "battor_p1_h1_t3" #physical port 1 on hub 1, on ttyUSB3
+3: Device 100 "My Test HUB" #hub 2
+3:4: Device 101 "My Test Internal HUB" #internal section of hub 2
+3:4:4: Device 102 "battor_p1_h2_t4" #physical port 1 on hub 2, on ttyusb4
+"""
+
+import logging
+import os
+import unittest
+
+from devil import devil_env
+from devil.utils import battor_device_mapping
+from devil.utils import find_usb_devices
+from devil.utils import lsusb
+from devil.utils import usb_hubs
+
+with devil_env.SysPath(devil_env.PYMOCK_PATH):
+ import mock # pylint: disable=import-error
+
+# Output of lsusb.lsusb().
+# We just test that the dictionary is working by creating an
+# "ID number" equal to (bus_num*1000)+device_num and seeing if
+# it is picked up correctly. Also we test the description
+
+DEVLIST = [(1, 11, 'foo'),
+ (1, 12, 'bar'),
+ (1, 13, 'baz'),
+ (2, 11, 'quux'),
+ (2, 20, 'My Test HUB'),
+ (2, 21, 'ID 0403:6001 battor_p7_h1_t0'),
+ (2, 22, 'ID 0403:6001 battor_p5_h1_t1'),
+ (2, 23, 'My Test Internal HUB'),
+ (2, 24, 'ID 0403:6001 battor_p3_h1_t2'),
+ (2, 25, 'ID 0403:6001 battor_p1_h1_t3'),
+ (2, 26, 'Not a Battery Monitor'),
+ (2, 100, 'My Test HUB'),
+ (2, 101, 'My Test Internal HUB'),
+ (2, 102, 'ID 0403:6001 battor_p1_h1_t4')]
+
+LSUSB_OUTPUT = [
+ {'bus': b, 'device': d, 'desc': t, 'id': (1000*b)+d}
+ for (b, d, t) in DEVLIST]
+
+
+# Note: "Lev", "Cnt", "Spd", and "MxCh" are not used by parser,
+# so we just leave them as zeros here. Also note that the port
+# numbers reported here start at 0, so they're 1 less than the
+# port numbers reported elsewhere.
+USB_DEVICES_OUTPUT = '''
+T: Bus=01 Lev=00 Prnt=00 Port=00 Cnt=00 Dev#= 11 Spd=000 MxCh=00
+S: SerialNumber=FooSerial
+T: Bus=01 Lev=00 Prnt=00 Port=01 Cnt=00 Dev#= 12 Spd=000 MxCh=00
+S: SerialNumber=BarSerial
+T: Bus=01 Lev=00 Prnt=00 Port=02 Cnt=00 Dev#= 13 Spd=000 MxCh=00
+S: SerialNumber=BazSerial
+
+T: Bus=02 Lev=00 Prnt=00 Port=00 Cnt=00 Dev#= 11 Spd=000 MxCh=00
+
+T: Bus=02 Lev=00 Prnt=00 Port=01 Cnt=00 Dev#= 20 Spd=000 MxCh=00
+T: Bus=02 Lev=00 Prnt=20 Port=00 Cnt=00 Dev#= 21 Spd=000 MxCh=00
+S: SerialNumber=BattOr0
+T: Bus=02 Lev=00 Prnt=20 Port=02 Cnt=00 Dev#= 22 Spd=000 MxCh=00
+S: SerialNumber=BattOr1
+T: Bus=02 Lev=00 Prnt=20 Port=03 Cnt=00 Dev#= 23 Spd=000 MxCh=00
+T: Bus=02 Lev=00 Prnt=23 Port=01 Cnt=00 Dev#= 24 Spd=000 MxCh=00
+S: SerialNumber=BattOr2
+T: Bus=02 Lev=00 Prnt=23 Port=03 Cnt=00 Dev#= 25 Spd=000 MxCh=00
+S: SerialNumber=BattOr3
+T: Bus=02 Lev=00 Prnt=23 Port=02 Cnt=00 Dev#= 26 Spd=000 MxCh=00
+
+T: Bus=02 Lev=00 Prnt=00 Port=02 Cnt=00 Dev#=100 Spd=000 MxCh=00
+T: Bus=02 Lev=00 Prnt=100 Port=03 Cnt=00 Dev#=101 Spd=000 MxCh=00
+T: Bus=02 Lev=00 Prnt=101 Port=03 Cnt=00 Dev#=102 Spd=000 MxCh=00
+'''
+
+RAW_LSUSB_OUTPUT = '''
+Bus 001 Device 011: FAST foo
+Bus 001 Device 012: FAST bar
+Bus 001 Device 013: baz
+Bus 002 Device 011: quux
+Bus 002 Device 020: My Test HUB
+Bus 002 Device 021: ID 0403:6001 battor_p7_h1_t0
+Bus 002 Device 022: ID 0403:6001 battor_p5_h1_t1
+Bus 002 Device 023: My Test Internal HUB
+Bus 002 Device 024: ID 0403:6001 battor_p3_h1_t2
+Bus 002 Device 025: ID 0403:6001 battor_p1_h1_t3
+Bus 002 Device 026: Not a Battery Monitor
+Bus 002 Device 100: My Test HUB
+Bus 002 Device 101: My Test Internal HUB
+Bus 002 Device 102: ID 0403:6001 battor_p1_h1_t4
+'''
+
+LIST_TTY_OUTPUT = '''
+ttyUSB0
+Something-else-0
+ttyUSB1
+ttyUSB2
+Something-else-1
+ttyUSB3
+ttyUSB4
+Something-else-2
+ttyUSB5
+'''
+
+# Note: The real output will have multiple lines with
+# ATTRS{busnum} and ATTRS{devnum}, but only the first
+# one counts. Thus the test output duplicates this.
+UDEVADM_USBTTY0_OUTPUT = '''
+ATTRS{busnum}=="2"
+ATTRS{devnum}=="21"
+ATTRS{busnum}=="0"
+ATTRS{devnum}=="0"
+'''
+
+UDEVADM_USBTTY1_OUTPUT = '''
+ATTRS{busnum}=="2"
+ATTRS{devnum}=="22"
+ATTRS{busnum}=="0"
+ATTRS{devnum}=="0"
+'''
+
+UDEVADM_USBTTY2_OUTPUT = '''
+ATTRS{busnum}=="2"
+ATTRS{devnum}=="24"
+ATTRS{busnum}=="0"
+ATTRS{devnum}=="0"
+'''
+
+UDEVADM_USBTTY3_OUTPUT = '''
+ATTRS{busnum}=="2"
+ATTRS{devnum}=="25"
+ATTRS{busnum}=="0"
+ATTRS{devnum}=="0"
+'''
+
+UDEVADM_USBTTY4_OUTPUT = '''
+ATTRS{busnum}=="2"
+ATTRS{devnum}=="102"
+ATTRS{busnum}=="0"
+ATTRS{devnum}=="0"
+'''
+
+UDEVADM_USBTTY5_OUTPUT = '''
+ATTRS{busnum}=="2"
+ATTRS{devnum}=="26"
+ATTRS{busnum}=="0"
+ATTRS{devnum}=="0"
+'''
+
+UDEVADM_OUTPUT_DICT = {
+ 'ttyUSB0': UDEVADM_USBTTY0_OUTPUT,
+ 'ttyUSB1': UDEVADM_USBTTY1_OUTPUT,
+ 'ttyUSB2': UDEVADM_USBTTY2_OUTPUT,
+ 'ttyUSB3': UDEVADM_USBTTY3_OUTPUT,
+ 'ttyUSB4': UDEVADM_USBTTY4_OUTPUT,
+ 'ttyUSB5': UDEVADM_USBTTY5_OUTPUT}
+
+# Identification criteria for Plugable 7-Port Hub
+def isTestHub(node):
+ """Check if a node is a Plugable 7-Port Hub
+ (Model USB2-HUB7BC)
+ The topology of this device is a 4-port hub,
+ with another 4-port hub connected on port 4.
+ """
+ if not isinstance(node, find_usb_devices.USBDeviceNode):
+ return False
+ if 'Test HUB' not in node.desc:
+ return False
+ if not node.HasPort(4):
+ return False
+ return 'Test Internal HUB' in node.PortToDevice(4).desc
+
+TEST_HUB = usb_hubs.HubType(isTestHub,
+ {1:7,
+ 2:6,
+ 3:5,
+ 4:{1:4, 2:3, 3:2, 4:1}})
+
+class USBScriptTest(unittest.TestCase):
+ def setUp(self):
+ find_usb_devices._GetTtyUSBInfo = mock.Mock(
+ side_effect=lambda x: UDEVADM_OUTPUT_DICT[x])
+ find_usb_devices._GetParsedLSUSBOutput = mock.Mock(
+ return_value=LSUSB_OUTPUT)
+ find_usb_devices._GetUSBDevicesOutput = mock.Mock(
+ return_value=USB_DEVICES_OUTPUT)
+ find_usb_devices._GetCommList = mock.Mock(
+ return_value=LIST_TTY_OUTPUT)
+ lsusb.raw_lsusb = mock.Mock(
+ return_value=RAW_LSUSB_OUTPUT)
+
+ def testIsBattOr(self):
+ bd = find_usb_devices.GetBusNumberToDeviceTreeMap()
+ self.assertTrue(battor_device_mapping.IsBattOr('ttyUSB3', bd))
+ self.assertFalse(battor_device_mapping.IsBattOr('ttyUSB5', bd))
+
+ def testGetBattOrs(self):
+ bd = find_usb_devices.GetBusNumberToDeviceTreeMap()
+ self.assertEquals(battor_device_mapping.GetBattOrList(bd),
+ ['ttyUSB0', 'ttyUSB1', 'ttyUSB2',
+ 'ttyUSB3', 'ttyUSB4'])
+
+ def testGetTTYDevices(self):
+ pp = find_usb_devices.GetAllPhysicalPortToTTYMaps([TEST_HUB])
+ result = list(pp)
+ self.assertEquals(result[0], {7:'ttyUSB0',
+ 5:'ttyUSB1',
+ 3:'ttyUSB2',
+ 2:'ttyUSB5',
+ 1:'ttyUSB3'})
+ self.assertEquals(result[1], {1:'ttyUSB4'})
+
+ def testGetPortDeviceMapping(self):
+ pp = find_usb_devices.GetAllPhysicalPortToBusDeviceMaps([TEST_HUB])
+ result = list(pp)
+ self.assertEquals(result[0], {7:(2, 21),
+ 5:(2, 22),
+ 3:(2, 24),
+ 2:(2, 26),
+ 1:(2, 25)})
+ self.assertEquals(result[1], {1:(2, 102)})
+
+ def testGetSerialMapping(self):
+ pp = find_usb_devices.GetAllPhysicalPortToSerialMaps([TEST_HUB])
+ result = list(pp)
+ self.assertEquals(result[0], {7:'BattOr0',
+ 5:'BattOr1',
+ 3:'BattOr2',
+ 1:'BattOr3'})
+ self.assertEquals(result[1], {})
+
+ def testFastDeviceDescriptions(self):
+ bd = find_usb_devices.GetBusNumberToDeviceTreeMap()
+ dev_foo = bd[1].FindDeviceNumber(11)
+ dev_bar = bd[1].FindDeviceNumber(12)
+ dev_battor_p7_h1_t0 = bd[2].FindDeviceNumber(21)
+ self.assertEquals(dev_foo.desc, 'FAST foo')
+ self.assertEquals(dev_bar.desc, 'FAST bar')
+ self.assertEquals(dev_battor_p7_h1_t0.desc,
+ 'ID 0403:6001 battor_p7_h1_t0')
+
+ def testDeviceDescriptions(self):
+ bd = find_usb_devices.GetBusNumberToDeviceTreeMap(fast=False)
+ dev_foo = bd[1].FindDeviceNumber(11)
+ dev_bar = bd[1].FindDeviceNumber(12)
+ dev_battor_p7_h1_t0 = bd[2].FindDeviceNumber(21)
+ self.assertEquals(dev_foo.desc, 'foo')
+ self.assertEquals(dev_bar.desc, 'bar')
+ self.assertEquals(dev_battor_p7_h1_t0.desc,
+ 'ID 0403:6001 battor_p7_h1_t0')
+
+ def testDeviceInformation(self):
+ bd = find_usb_devices.GetBusNumberToDeviceTreeMap(fast=False)
+ dev_foo = bd[1].FindDeviceNumber(11)
+ dev_bar = bd[1].FindDeviceNumber(12)
+ dev_battor_p7_h1_t0 = bd[2].FindDeviceNumber(21)
+ self.assertEquals(dev_foo.info['id'], 1011)
+ self.assertEquals(dev_bar.info['id'], 1012)
+ self.assertEquals(dev_battor_p7_h1_t0.info['id'], 2021)
+
+ def testSerialNumber(self):
+ bd = find_usb_devices.GetBusNumberToDeviceTreeMap(fast=False)
+ dev_foo = bd[1].FindDeviceNumber(11)
+ dev_bar = bd[1].FindDeviceNumber(12)
+ dev_battor_p7_h1_t0 = bd[2].FindDeviceNumber(21)
+ self.assertEquals(dev_foo.serial, 'FooSerial')
+ self.assertEquals(dev_bar.serial, 'BarSerial')
+ self.assertEquals(dev_battor_p7_h1_t0.serial, 'BattOr0')
+
+ def testBattOrDictMapping(self):
+ map_dict = {'Phone1':'BattOr1', 'Phone2':'BattOr2', 'Phone3':'BattOr3'}
+ a1 = battor_device_mapping.GetBattOrPathFromPhoneSerial(
+ 'Phone1', serial_map=map_dict)
+ a2 = battor_device_mapping.GetBattOrPathFromPhoneSerial(
+ 'Phone2', serial_map=map_dict)
+ a3 = battor_device_mapping.GetBattOrPathFromPhoneSerial(
+ 'Phone3', serial_map=map_dict)
+ self.assertEquals(a1, '/dev/ttyUSB1')
+ self.assertEquals(a2, '/dev/ttyUSB2')
+ self.assertEquals(a3, '/dev/ttyUSB3')
+
+ def testBattOrDictFromFileMapping(self):
+ try:
+ map_dict = {'Phone1':'BattOr1', 'Phone2':'BattOr2', 'Phone3':'BattOr3'}
+ curr_dir = os.path.dirname(os.path.realpath(__file__))
+ filename = os.path.join(curr_dir, 'test', 'data', 'test_write_map.json')
+ battor_device_mapping.WriteSerialMapFile(filename, map_dict)
+ a1 = battor_device_mapping.GetBattOrPathFromPhoneSerial(
+ 'Phone1', serial_map_file=filename)
+ a2 = battor_device_mapping.GetBattOrPathFromPhoneSerial(
+ 'Phone2', serial_map_file=filename)
+ a3 = battor_device_mapping.GetBattOrPathFromPhoneSerial(
+ 'Phone3', serial_map_file=filename)
+ finally:
+ os.remove(filename)
+ self.assertEquals(a1, '/dev/ttyUSB1')
+ self.assertEquals(a2, '/dev/ttyUSB2')
+ self.assertEquals(a3, '/dev/ttyUSB3')
+
+ def testReadSerialMapFile(self):
+ curr_dir = os.path.dirname(os.path.realpath(__file__))
+ map_dict = battor_device_mapping.ReadSerialMapFile(
+ os.path.join(curr_dir, 'test', 'data', 'test_serial_map.json'))
+ self.assertEquals(len(map_dict.keys()), 3)
+ self.assertEquals(map_dict['Phone1'], 'BattOr1')
+ self.assertEquals(map_dict['Phone2'], 'BattOr2')
+ self.assertEquals(map_dict['Phone3'], 'BattOr3')
+
+original_PPTSM = find_usb_devices.GetAllPhysicalPortToSerialMaps
+original_PPTTM = find_usb_devices.GetAllPhysicalPortToTTYMaps
+original_GBL = battor_device_mapping.GetBattOrList
+original_GBNDM = find_usb_devices.GetBusNumberToDeviceTreeMap
+original_IB = battor_device_mapping.IsBattOr
+original_GBSM = battor_device_mapping.GetBattOrSerialNumbers
+
+def setup_battor_test(serial, tty, battor, bser=None):
+ serial_mapper = mock.Mock(return_value=serial)
+ tty_mapper = mock.Mock(return_value=tty)
+ battor_lister = mock.Mock(return_value=battor)
+ devtree = mock.Mock(return_value=None)
+ is_battor = mock.Mock(side_effect=lambda x, y: x in battor)
+ battor_serials = mock.Mock(return_value=bser)
+ find_usb_devices.GetAllPhysicalPortToSerialMaps = serial_mapper
+ find_usb_devices.GetAllPhysicalPortToTTYMaps = tty_mapper
+ battor_device_mapping.GetBattOrList = battor_lister
+ find_usb_devices.GetBusNumberToDeviceTreeMap = devtree
+ battor_device_mapping.IsBattOr = is_battor
+ battor_device_mapping.GetBattOrSerialNumbers = battor_serials
+
+class BattOrMappingTest(unittest.TestCase):
+ def tearDown(self):
+ find_usb_devices.GetAllPhysicalPortToSerialMaps = original_PPTSM
+ find_usb_devices.GetAllPhysicalPortToTTYMaps = original_PPTTM
+ battor_device_mapping.GetBattOrList = original_GBL
+ find_usb_devices.GetBusNumberToDeviceTreeMap = original_GBNDM
+ battor_device_mapping.IsBattOr = original_IB
+ battor_device_mapping.GetBattOrSerialNumbers = original_GBSM
+
+ def test_generate_serial_map(self):
+ setup_battor_test([{1:'Phn1', 2:'Phn2', 3:'Phn3'},
+ {1:'Bat1', 2:'Bat2', 3:'Bat3'}],
+ [{},
+ {1:'ttyUSB0', 2:'ttyUSB1', 3:'ttyUSB2'}],
+ ['ttyUSB0', 'ttyUSB1', 'ttyUSB2'],
+ ['Bat1', 'Bat2', 'Bat3'])
+ result = battor_device_mapping.GenerateSerialMap()
+ self.assertEqual(len(result), 3)
+ self.assertEqual(result['Phn1'], 'Bat1')
+ self.assertEqual(result['Phn2'], 'Bat2')
+ self.assertEqual(result['Phn3'], 'Bat3')
+
+
+if __name__ == "__main__":
+ logging.getLogger().setLevel(logging.DEBUG)
+ unittest.main(verbosity=2)
diff --git a/third_party/catapult/devil/devil/utils/geometry.py b/third_party/catapult/devil/devil/utils/geometry.py
new file mode 100644
index 0000000000..da21770b3e
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/geometry.py
@@ -0,0 +1,75 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Objects for convenient manipulation of points and other surface areas."""
+
+import collections
+
+
+class Point(collections.namedtuple('Point', ['x', 'y'])):
+ """Object to represent an (x, y) point on a surface.
+
+ Args:
+ x, y: Two numeric coordinates that define the point.
+ """
+ __slots__ = ()
+
+ def __str__(self):
+ """Get a useful string representation of the object."""
+ return '(%s, %s)' % (self.x, self.y)
+
+ def __add__(self, other):
+ """Sum of two points, e.g. p + q."""
+ if isinstance(other, Point):
+ return Point(self.x + other.x, self.y + other.y)
+ else:
+ return NotImplemented
+
+ def __mul__(self, factor):
+ """Multiplication on the right is not implemented."""
+ # This overrides the default behaviour of a tuple multiplied by a constant
+ # on the right, which does not make sense for a Point.
+ return NotImplemented
+
+ def __rmul__(self, factor):
+ """Multiply a point by a scalar factor on the left, e.g. 2 * p."""
+ return Point(factor * self.x, factor * self.y)
+
+
+class Rectangle(
+ collections.namedtuple('Rectangle', ['top_left', 'bottom_right'])):
+ """Object to represent a rectangle on a surface.
+
+ Args:
+ top_left: A pair of (left, top) coordinates. Might be given as a Point
+ or as a two-element sequence (list, tuple, etc.).
+ bottom_right: A pair (right, bottom) coordinates.
+ """
+ __slots__ = ()
+
+ def __new__(cls, top_left, bottom_right):
+ if not isinstance(top_left, Point):
+ top_left = Point(*top_left)
+ if not isinstance(bottom_right, Point):
+ bottom_right = Point(*bottom_right)
+ return super(Rectangle, cls).__new__(cls, top_left, bottom_right)
+
+ def __str__(self):
+ """Get a useful string representation of the object."""
+ return '[%s, %s]' % (self.top_left, self.bottom_right)
+
+ @property
+ def center(self):
+ """Get the point at the center of the rectangle."""
+ return 0.5 * (self.top_left + self.bottom_right)
+
+ @classmethod
+ def FromDict(cls, d):
+ """Create a rectangle object from a dictionary.
+
+ Args:
+ d: A dictionary (or mapping) of the form, e.g., {'top': 0, 'left': 0,
+ 'bottom': 1, 'right': 1}.
+ """
+ return cls(Point(d['left'], d['top']), Point(d['right'], d['bottom']))
diff --git a/third_party/catapult/devil/devil/utils/geometry_test.py b/third_party/catapult/devil/devil/utils/geometry_test.py
new file mode 100644
index 0000000000..af69442930
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/geometry_test.py
@@ -0,0 +1,61 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Tests for the geometry module."""
+
+import unittest
+
+from devil.utils import geometry as g
+
+
+class PointTest(unittest.TestCase):
+
+ def testStr(self):
+ p = g.Point(1, 2)
+ self.assertEquals(str(p), '(1, 2)')
+
+ def testAdd(self):
+ p = g.Point(1, 2)
+ q = g.Point(3, 4)
+ r = g.Point(4, 6)
+ self.assertEquals(p + q, r)
+
+ def testAdd_TypeErrorWithInvalidOperands(self):
+ # pylint: disable=pointless-statement
+ p = g.Point(1, 2)
+ with self.assertRaises(TypeError):
+ p + 4 # Can't add point and scalar.
+ with self.assertRaises(TypeError):
+ 4 + p # Can't add scalar and point.
+
+ def testMult(self):
+ p = g.Point(1, 2)
+ r = g.Point(2, 4)
+ self.assertEquals(2 * p, r) # Multiply by scalar on the left.
+
+ def testMult_TypeErrorWithInvalidOperands(self):
+ # pylint: disable=pointless-statement
+ p = g.Point(1, 2)
+ q = g.Point(2, 4)
+ with self.assertRaises(TypeError):
+ p * q # Can't multiply points.
+ with self.assertRaises(TypeError):
+ p * 4 # Can't multiply by a scalar on the right.
+
+
+class RectangleTest(unittest.TestCase):
+
+ def testStr(self):
+ r = g.Rectangle(g.Point(0, 1), g.Point(2, 3))
+ self.assertEquals(str(r), '[(0, 1), (2, 3)]')
+
+ def testCenter(self):
+ r = g.Rectangle(g.Point(0, 1), g.Point(2, 3))
+ c = g.Point(1, 2)
+ self.assertEquals(r.center, c)
+
+ def testFromJson(self):
+ r1 = g.Rectangle(g.Point(0, 1), g.Point(2, 3))
+ r2 = g.Rectangle.FromDict({'top': 1, 'left': 0, 'bottom': 3, 'right': 2})
+ self.assertEquals(r1, r2)
diff --git a/third_party/catapult/devil/devil/utils/host_utils.py b/third_party/catapult/devil/devil/utils/host_utils.py
new file mode 100644
index 0000000000..580721f127
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/host_utils.py
@@ -0,0 +1,16 @@
+# Copyright 2014 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import os
+
+
+def GetRecursiveDiskUsage(path):
+ """Returns the disk usage in bytes of |path|. Similar to `du -sb |path|`."""
+ running_size = os.path.getsize(path)
+ if os.path.isdir(path):
+ for root, dirs, files in os.walk(path):
+ running_size += sum([os.path.getsize(os.path.join(root, f))
+ for f in files + dirs])
+ return running_size
+
diff --git a/third_party/catapult/devil/devil/utils/lazy/__init__.py b/third_party/catapult/devil/devil/utils/lazy/__init__.py
new file mode 100644
index 0000000000..3cc56c0acf
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/lazy/__init__.py
@@ -0,0 +1,5 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+from devil.utils.lazy.weak_constant import WeakConstant
diff --git a/third_party/catapult/devil/devil/utils/lazy/weak_constant.py b/third_party/catapult/devil/devil/utils/lazy/weak_constant.py
new file mode 100644
index 0000000000..3558f29ac6
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/lazy/weak_constant.py
@@ -0,0 +1,29 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import threading
+
+
+class WeakConstant(object):
+ """A thread-safe, lazily initialized object.
+
+ This does not support modification after initialization. The intended
+ constant nature of the object is not enforced, though, hence the "weak".
+ """
+
+ def __init__(self, initializer):
+ self._initialized = False
+ self._initializer = initializer
+ self._lock = threading.Lock()
+ self._val = None
+
+ def read(self):
+ """Get the object, creating it if necessary."""
+ if self._initialized:
+ return self._val
+ with self._lock:
+ if not self._initialized:
+ self._val = self._initializer()
+ self._initialized = True
+ return self._val
diff --git a/third_party/catapult/devil/devil/utils/lsusb.py b/third_party/catapult/devil/devil/utils/lsusb.py
new file mode 100644
index 0000000000..6cbf2567b9
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/lsusb.py
@@ -0,0 +1,174 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import logging
+import re
+
+from devil.utils import cmd_helper
+
+logger = logging.getLogger(__name__)
+
+_COULDNT_OPEN_ERROR_RE = re.compile(r'Couldn\'t open device.*')
+_INDENTATION_RE = re.compile(r'^( *)')
+_LSUSB_BUS_DEVICE_RE = re.compile(r'^Bus (\d{3}) Device (\d{3}): (.*)')
+_LSUSB_ENTRY_RE = re.compile(r'^ *([^ ]+) +([^ ]+) *([^ ].*)?$')
+_LSUSB_GROUP_RE = re.compile(r'^ *([^ ]+.*):$')
+
+
+def _lsusbv_on_device(bus_id, dev_id):
+ """Calls lsusb -v on device."""
+ _, raw_output = cmd_helper.GetCmdStatusAndOutputWithTimeout(
+ ['lsusb', '-v', '-s', '%s:%s' % (bus_id, dev_id)], timeout=10)
+
+ device = {'bus': bus_id, 'device': dev_id}
+ depth_stack = [device]
+
+ # This builds a nested dict -- a tree, basically -- that corresponds
+ # to the lsusb output. It looks first for a line containing
+ #
+ # "Bus <bus number> Device <device number>: ..."
+ #
+ # and uses that to create the root node. It then parses all remaining
+ # lines as a tree, with the indentation level determining the
+ # depth of the new node.
+ #
+ # This expects two kinds of lines:
+ # - "groups", which take the form
+ # "<Group name>:"
+ # and typically have children, and
+ # - "entries", which take the form
+ # "<entry name> <entry value> <possible entry description>"
+ # and typically do not have children (but can).
+ #
+ # This maintains a stack containing all current ancestor nodes in
+ # order to add new nodes to the proper place in the tree.
+ # The stack is added to when a new node is parsed. Nodes are removed
+ # from the stack when they are either at the same indentation level as
+ # or a deeper indentation level than the current line.
+ #
+ # e.g. the following lsusb output:
+ #
+ # Bus 123 Device 456: School bus
+ # Device Descriptor:
+ # bDeviceClass 5 Actual School Bus
+ # Configuration Descriptor:
+ # bLength 20 Rows
+ #
+ # would produce the following dict:
+ #
+ # {
+ # 'bus': 123,
+ # 'device': 456,
+ # 'desc': 'School bus',
+ # 'Device Descriptor': {
+ # 'bDeviceClass': {
+ # '_value': '5',
+ # '_desc': 'Actual School Bus',
+ # },
+ # 'Configuration Descriptor': {
+ # 'bLength': {
+ # '_value': '20',
+ # '_desc': 'Rows',
+ # },
+ # },
+ # }
+ # }
+ for line in raw_output.splitlines():
+ # Ignore blank lines.
+ if not line:
+ continue
+ # Filter out error mesage about opening device.
+ if _COULDNT_OPEN_ERROR_RE.match(line):
+ continue
+ # Find start of device information.
+ m = _LSUSB_BUS_DEVICE_RE.match(line)
+ if m:
+ if m.group(1) != bus_id:
+ logger.warning(
+ 'Expected bus_id value: %r, seen %r', bus_id, m.group(1))
+ if m.group(2) != dev_id:
+ logger.warning(
+ 'Expected dev_id value: %r, seen %r', dev_id, m.group(2))
+ device['desc'] = m.group(3)
+ continue
+
+ # Skip any lines that aren't indented, as they're not part of the
+ # device descriptor.
+ indent_match = _INDENTATION_RE.match(line)
+ if not indent_match:
+ continue
+
+ # Determine the indentation depth.
+ depth = 1 + len(indent_match.group(1)) / 2
+ if depth > len(depth_stack):
+ logger.error(
+ 'lsusb parsing error: unexpected indentation: "%s"', line)
+ continue
+
+ # Pop everything off the depth stack that isn't a parent of
+ # this element.
+ while depth < len(depth_stack):
+ depth_stack.pop()
+
+ cur = depth_stack[-1]
+
+ m = _LSUSB_GROUP_RE.match(line)
+ if m:
+ new_group = {}
+ cur[m.group(1)] = new_group
+ depth_stack.append(new_group)
+ continue
+
+ m = _LSUSB_ENTRY_RE.match(line)
+ if m:
+ new_entry = {
+ '_value': m.group(2),
+ '_desc': m.group(3),
+ }
+ cur[m.group(1)] = new_entry
+ depth_stack.append(new_entry)
+ continue
+
+ logger.error('lsusb parsing error: unrecognized line: "%s"', line)
+
+ return device
+
+def lsusb():
+ """Call lsusb and return the parsed output."""
+ _, lsusb_list_output = cmd_helper.GetCmdStatusAndOutputWithTimeout(
+ ['lsusb'], timeout=10)
+ devices = []
+ for line in lsusb_list_output.splitlines():
+ m = _LSUSB_BUS_DEVICE_RE.match(line)
+ if m:
+ bus_num = m.group(1)
+ dev_num = m.group(2)
+ try:
+ devices.append(_lsusbv_on_device(bus_num, dev_num))
+ except cmd_helper.TimeoutError:
+ # Will be blacklisted if it is in expected device file, but times out.
+ logger.info('lsusb -v %s:%s timed out.', bus_num, dev_num)
+ return devices
+
+def raw_lsusb():
+ return cmd_helper.GetCmdOutput(['lsusb'])
+
+def get_lsusb_serial(device):
+ try:
+ return device['Device Descriptor']['iSerial']['_desc']
+ except KeyError:
+ return None
+
+def _is_android_device(device):
+ try:
+ # Hubs are not android devices.
+ if device['Device Descriptor']['bDeviceClass']['_value'] == '9':
+ return False
+ except KeyError:
+ pass
+ return get_lsusb_serial(device) is not None
+
+def get_android_devices():
+ android_devices = (d for d in lsusb() if _is_android_device(d))
+ return [get_lsusb_serial(d) for d in android_devices]
diff --git a/third_party/catapult/devil/devil/utils/lsusb_test.py b/third_party/catapult/devil/devil/utils/lsusb_test.py
new file mode 100755
index 0000000000..f381e72f10
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/lsusb_test.py
@@ -0,0 +1,250 @@
+#!/usr/bin/env python
+# Copyright 2013 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Tests for the cmd_helper module."""
+
+import unittest
+
+from devil import devil_env
+from devil.utils import lsusb
+from devil.utils import mock_calls
+
+with devil_env.SysPath(devil_env.PYMOCK_PATH):
+ import mock # pylint: disable=import-error
+
+RAW_OUTPUT = """
+Bus 003 Device 007: ID 18d1:4ee2 Google Inc. Nexus 4 (debug)
+Device Descriptor:
+ bLength 18
+ bDescriptorType 1
+ bcdUSB 2.00
+ bDeviceClass 0 (Defined at Interface level)
+ bDeviceSubClass 0
+ bDeviceProtocol 0
+ bMaxPacketSize0 64
+ idVendor 0x18d1 Google Inc.
+ idProduct 0x4ee2 Nexus 4 (debug)
+ bcdDevice 2.28
+ iManufacturer 1 LGE
+ iProduct 2 Nexus 4
+ iSerial 3 01d2450ea194a93b
+ bNumConfigurations 1
+ Configuration Descriptor:
+ bLength 9
+ bDescriptorType 2
+ wTotalLength 62
+ bNumInterfaces 2
+ bConfigurationValue 1
+ iConfiguration 0
+ bmAttributes 0x80
+ (Bus Powered)
+ MaxPower 500mA
+ Interface Descriptor:
+ bLength 9
+ bDescriptorType 4
+ bInterfaceNumber 0
+ bAlternateSetting 0
+ bNumEndpoints 3
+ bInterfaceClass 255 Vendor Specific Class
+ bInterfaceSubClass 255 Vendor Specific Subclass
+ bInterfaceProtocol 0
+ iInterface 4 MTP
+ Endpoint Descriptor:
+ bLength 7
+ bDescriptorType 5
+ bEndpointAddress 0x81 EP 1 IN
+ bmAttributes 2
+ Transfer Type Bulk
+ Synch Type None
+ Usage Type Data
+ wMaxPacketSize 0x0040 1x 64 bytes
+ bInterval 0
+ Endpoint Descriptor:
+ bLength 7
+ bDescriptorType 5
+ bEndpointAddress 0x01 EP 1 OUT
+ bmAttributes 2
+ Transfer Type Bulk
+ Synch Type None
+ Usage Type Data
+ wMaxPacketSize 0x0040 1x 64 bytes
+ bInterval 0
+ Endpoint Descriptor:
+ bLength 7
+ bDescriptorType 5
+ bEndpointAddress 0x82 EP 2 IN
+ bmAttributes 3
+ Transfer Type Interrupt
+ Synch Type None
+ Usage Type Data
+ wMaxPacketSize 0x001c 1x 28 bytes
+ bInterval 6
+ Interface Descriptor:
+ bLength 9
+ bDescriptorType 4
+ bInterfaceNumber 1
+ bAlternateSetting 0
+ bNumEndpoints 2
+ bInterfaceClass 255 Vendor Specific Class
+ bInterfaceSubClass 66
+ bInterfaceProtocol 1
+ iInterface 0
+ Endpoint Descriptor:
+ bLength 7
+ bDescriptorType 5
+ bEndpointAddress 0x83 EP 3 IN
+ bmAttributes 2
+ Transfer Type Bulk
+ Synch Type None
+ Usage Type Data
+ wMaxPacketSize 0x0040 1x 64 bytes
+ bInterval 0
+ Endpoint Descriptor:
+ bLength 7
+ bDescriptorType 5
+ bEndpointAddress 0x02 EP 2 OUT
+ bmAttributes 2
+ Transfer Type Bulk
+ Synch Type None
+ Usage Type Data
+ wMaxPacketSize 0x0040 1x 64 bytes
+ bInterval 0
+Device Qualifier (for other device speed):
+ bLength 10
+ bDescriptorType 6
+ bcdUSB 2.00
+ bDeviceClass 0 (Defined at Interface level)
+ bDeviceSubClass 0
+ bDeviceProtocol 0
+ bMaxPacketSize0 64
+ bNumConfigurations 1
+Device Status: 0x0000
+ (Bus Powered)
+"""
+DEVICE_LIST = 'Bus 003 Device 007: ID 18d1:4ee2 Google Inc. Nexus 4 (debug)'
+
+EXPECTED_RESULT = {
+ 'device': '007',
+ 'bus': '003',
+ 'desc': 'ID 18d1:4ee2 Google Inc. Nexus 4 (debug)',
+ 'Device': {
+ '_value': 'Status:',
+ '_desc': '0x0000',
+ '(Bus': {
+ '_value': 'Powered)',
+ '_desc': None
+ }
+ },
+ 'Device Descriptor': {
+ 'bLength': {'_value': '18', '_desc': None},
+ 'bcdDevice': {'_value': '2.28', '_desc': None},
+ 'bDeviceSubClass': {'_value': '0', '_desc': None},
+ 'idVendor': {'_value': '0x18d1', '_desc': 'Google Inc.'},
+ 'bcdUSB': {'_value': '2.00', '_desc': None},
+ 'bDeviceProtocol': {'_value': '0', '_desc': None},
+ 'bDescriptorType': {'_value': '1', '_desc': None},
+ 'Configuration Descriptor': {
+ 'bLength': {'_value': '9', '_desc': None},
+ 'wTotalLength': {'_value': '62', '_desc': None},
+ 'bConfigurationValue': {'_value': '1', '_desc': None},
+ 'Interface Descriptor': {
+ 'bLength': {'_value': '9', '_desc': None},
+ 'bAlternateSetting': {'_value': '0', '_desc': None},
+ 'bInterfaceNumber': {'_value': '1', '_desc': None},
+ 'bNumEndpoints': {'_value': '2', '_desc': None},
+ 'bDescriptorType': {'_value': '4', '_desc': None},
+ 'bInterfaceSubClass': {'_value': '66', '_desc': None},
+ 'bInterfaceClass': {
+ '_value': '255',
+ '_desc': 'Vendor Specific Class'
+ },
+ 'bInterfaceProtocol': {'_value': '1', '_desc': None},
+ 'Endpoint Descriptor': {
+ 'bLength': {'_value': '7', '_desc': None},
+ 'bEndpointAddress': {'_value': '0x02', '_desc': 'EP 2 OUT'},
+ 'bInterval': {'_value': '0', '_desc': None},
+ 'bDescriptorType': {'_value': '5', '_desc': None},
+ 'bmAttributes': {
+ '_value': '2',
+ 'Transfer': {'_value': 'Type', '_desc': 'Bulk'},
+ 'Usage': {'_value': 'Type', '_desc': 'Data'},
+ '_desc': None,
+ 'Synch': {'_value': 'Type', '_desc': 'None'}
+ },
+ 'wMaxPacketSize': {
+ '_value': '0x0040',
+ '_desc': '1x 64 bytes'
+ }
+ },
+ 'iInterface': {'_value': '0', '_desc': None}
+ },
+ 'bDescriptorType': {'_value': '2', '_desc': None},
+ 'iConfiguration': {'_value': '0', '_desc': None},
+ 'bmAttributes': {
+ '_value': '0x80',
+ '_desc': None,
+ '(Bus': {'_value': 'Powered)', '_desc': None}
+ },
+ 'bNumInterfaces': {'_value': '2', '_desc': None},
+ 'MaxPower': {'_value': '500mA', '_desc': None}
+ },
+ 'iSerial': {'_value': '3', '_desc': '01d2450ea194a93b'},
+ 'idProduct': {'_value': '0x4ee2', '_desc': 'Nexus 4 (debug)'},
+ 'iManufacturer': {'_value': '1', '_desc': 'LGE'},
+ 'bDeviceClass': {
+ '_value': '0',
+ '_desc': '(Defined at Interface level)'
+ },
+ 'iProduct': {'_value': '2', '_desc': 'Nexus 4'},
+ 'bMaxPacketSize0': {'_value': '64', '_desc': None},
+ 'bNumConfigurations': {'_value': '1', '_desc': None}
+ },
+ 'Device Qualifier (for other device speed)': {
+ 'bLength': {'_value': '10', '_desc': None},
+ 'bNumConfigurations': {'_value': '1', '_desc': None},
+ 'bDeviceSubClass': {'_value': '0', '_desc': None},
+ 'bcdUSB': {'_value': '2.00', '_desc': None},
+ 'bDeviceProtocol': {'_value': '0', '_desc': None},
+ 'bDescriptorType': {'_value': '6', '_desc': None},
+ 'bDeviceClass': {
+ '_value': '0',
+ '_desc': '(Defined at Interface level)'
+ },
+ 'bMaxPacketSize0': {'_value': '64', '_desc': None}
+ }
+}
+
+
+class LsusbTest(mock_calls.TestCase):
+ """Test Lsusb parsing."""
+
+ def testLsusb(self):
+ with self.assertCalls(
+ (mock.call.devil.utils.cmd_helper.GetCmdStatusAndOutputWithTimeout(
+ ['lsusb'], timeout=10), (None, DEVICE_LIST)),
+ (mock.call.devil.utils.cmd_helper.GetCmdStatusAndOutputWithTimeout(
+ ['lsusb', '-v', '-s', '003:007'], timeout=10), (None, RAW_OUTPUT))):
+ self.assertDictEqual(lsusb.lsusb().pop(), EXPECTED_RESULT)
+
+ def testGetSerial(self):
+ with self.assertCalls(
+ (mock.call.devil.utils.cmd_helper.GetCmdStatusAndOutputWithTimeout(
+ ['lsusb'], timeout=10), (None, DEVICE_LIST)),
+ (mock.call.devil.utils.cmd_helper.GetCmdStatusAndOutputWithTimeout(
+ ['lsusb', '-v', '-s', '003:007'], timeout=10), (None, RAW_OUTPUT))):
+ self.assertEqual(lsusb.get_android_devices(), ['01d2450ea194a93b'])
+
+ def testGetLsusbSerial(self):
+ with self.assertCalls(
+ (mock.call.devil.utils.cmd_helper.GetCmdStatusAndOutputWithTimeout(
+ ['lsusb'], timeout=10), (None, DEVICE_LIST)),
+ (mock.call.devil.utils.cmd_helper.GetCmdStatusAndOutputWithTimeout(
+ ['lsusb', '-v', '-s', '003:007'], timeout=10), (None, RAW_OUTPUT))):
+ out = lsusb.lsusb().pop()
+ self.assertEqual(lsusb.get_lsusb_serial(out), '01d2450ea194a93b')
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/third_party/catapult/devil/devil/utils/markdown.py b/third_party/catapult/devil/devil/utils/markdown.py
new file mode 100755
index 0000000000..54e7ed5629
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/markdown.py
@@ -0,0 +1,320 @@
+#! /usr/bin/env python
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import argparse
+import imp
+import os
+import re
+import sys
+import textwrap
+import types
+
+# A markdown code block template: https://goo.gl/9EsyRi
+_CODE_BLOCK_FORMAT = '''```{language}
+{code}
+```
+'''
+
+_DEVIL_ROOT = os.path.abspath(os.path.join(
+ os.path.dirname(__file__), '..', '..'))
+
+
+def md_bold(raw_text):
+ """Returns markdown-formatted bold text."""
+ return '**%s**' % md_escape(raw_text, characters='*')
+
+
+def md_code(raw_text, language):
+ """Returns a markdown-formatted code block in the given language."""
+ return _CODE_BLOCK_FORMAT.format(
+ language=language or '',
+ code=md_escape(raw_text, characters='`'))
+
+
+def md_escape(raw_text, characters='*_'):
+ """Escapes * and _."""
+ def escape_char(m):
+ return '\\%s' % m.group(0)
+ pattern = '[%s]' % re.escape(characters)
+ return re.sub(pattern, escape_char, raw_text)
+
+
+def md_heading(raw_text, level):
+ """Returns markdown-formatted heading."""
+ adjusted_level = min(max(level, 0), 6)
+ return '%s%s%s' % (
+ '#' * adjusted_level, ' ' if adjusted_level > 0 else '', raw_text)
+
+
+def md_inline_code(raw_text):
+ """Returns markdown-formatted inline code."""
+ return '`%s`' % md_escape(raw_text, characters='`')
+
+
+def md_italic(raw_text):
+ """Returns markdown-formatted italic text."""
+ return '*%s*' % md_escape(raw_text, characters='*')
+
+
+def md_link(link_text, link_target):
+ """returns a markdown-formatted link."""
+ return '[%s](%s)' % (
+ md_escape(link_text, characters=']'),
+ md_escape(link_target, characters=')'))
+
+
+class MarkdownHelpFormatter(argparse.HelpFormatter):
+ """A really bare-bones argparse help formatter that generates valid markdown.
+
+ This will generate something like:
+
+ usage
+
+ # **section heading**:
+
+ ## **--argument-one**
+
+ ```
+ argument-one help text
+ ```
+
+ """
+
+ #override
+ def _format_usage(self, usage, actions, groups, prefix):
+ usage_text = super(MarkdownHelpFormatter, self)._format_usage(
+ usage, actions, groups, prefix)
+ return md_code(usage_text, language=None)
+
+ #override
+ def format_help(self):
+ self._root_section.heading = md_heading(self._prog, level=1)
+ return super(MarkdownHelpFormatter, self).format_help()
+
+ #override
+ def start_section(self, heading):
+ super(MarkdownHelpFormatter, self).start_section(
+ md_heading(heading, level=2))
+
+ #override
+ def _format_action(self, action):
+ lines = []
+ action_header = self._format_action_invocation(action)
+ lines.append(md_heading(action_header, level=3))
+ if action.help:
+ lines.append(md_code(self._expand_help(action), language=None))
+ lines.extend(['', ''])
+ return '\n'.join(lines)
+
+
+class MarkdownHelpAction(argparse.Action):
+ def __init__(self, option_strings,
+ dest=argparse.SUPPRESS, default=argparse.SUPPRESS,
+ **kwargs):
+ super(MarkdownHelpAction, self).__init__(
+ option_strings=option_strings,
+ dest=dest,
+ default=default,
+ nargs=0,
+ **kwargs)
+
+ def __call__(self, parser, namespace, values, option_string=None):
+ parser.formatter_class = MarkdownHelpFormatter
+ parser.print_help()
+ parser.exit()
+
+
+def add_md_help_argument(parser):
+ """Adds --md-help to the given argparse.ArgumentParser.
+
+ Running a script with --md-help will print the help text for that script
+ as valid markdown.
+
+ Args:
+ parser: The ArgumentParser to which --md-help should be added.
+ """
+ parser.add_argument('--md-help', action=MarkdownHelpAction,
+ help='print Markdown-formatted help text and exit.')
+
+
+def load_module_from_path(module_path):
+ """Load a module given only the path name.
+
+ Also loads package modules as necessary.
+
+ Args:
+ module_path: An absolute path to a python module.
+ Returns:
+ The module object for the given path.
+ """
+ module_names = [os.path.splitext(os.path.basename(module_path))[0]]
+ d = os.path.dirname(module_path)
+
+ while os.path.exists(os.path.join(d, '__init__.py')):
+ module_names.append(os.path.basename(d))
+ d = os.path.dirname(d)
+
+ d = [d]
+
+ module = None
+ full_module_name = ''
+ for package_name in reversed(module_names):
+ if module:
+ d = module.__path__
+ full_module_name += '.'
+ r = imp.find_module(package_name, d)
+ full_module_name += package_name
+ module = imp.load_module(full_module_name, *r)
+ return module
+
+
+def md_module(module_obj, module_path=None, module_link=None):
+ """Write markdown documentation for a class.
+
+ Documents public classes and functions.
+
+ Args:
+ class_obj: a types.TypeType object for the class that should be
+ documented.
+ Returns:
+ A list of markdown-formatted lines.
+ """
+ def should_doc(name):
+ return (type(module_obj.__dict__[name]) != types.ModuleType
+ and not name.startswith('_'))
+
+ stuff_to_doc = sorted(
+ obj for name, obj in module_obj.__dict__.iteritems()
+ if should_doc(name))
+
+ classes_to_doc = []
+ functions_to_doc = []
+
+ for s in stuff_to_doc:
+ if type(s) == types.TypeType:
+ classes_to_doc.append(s)
+ elif type(s) == types.FunctionType:
+ functions_to_doc.append(s)
+
+ command = ['devil/utils/markdown.py']
+ if module_link:
+ command.extend(['--module-link', module_link])
+ if module_path:
+ command.append(os.path.relpath(module_path, _DEVIL_ROOT))
+
+ heading_text = module_obj.__name__
+ if module_link:
+ heading_text = md_link(heading_text, module_link)
+
+ content = [
+ md_heading(heading_text, level=1),
+ '',
+ md_italic('This page was autogenerated by %s'
+ % md_inline_code(' '.join(command))),
+ '',
+ ]
+
+ for c in classes_to_doc:
+ content += md_class(c)
+ for f in functions_to_doc:
+ content += md_function(f)
+
+ print '\n'.join(content)
+
+ return 0
+
+
+def md_class(class_obj):
+ """Write markdown documentation for a class.
+
+ Documents public methods. Does not currently document subclasses.
+
+ Args:
+ class_obj: a types.TypeType object for the class that should be
+ documented.
+ Returns:
+ A list of markdown-formatted lines.
+ """
+ content = [md_heading(md_escape(class_obj.__name__), level=2)]
+ content.append('')
+ if class_obj.__doc__:
+ content.extend(md_docstring(class_obj.__doc__))
+
+ def should_doc(name, obj):
+ return (type(obj) == types.FunctionType
+ and (name.startswith('__') or not name.startswith('_')))
+
+ methods_to_doc = sorted(
+ obj for name, obj in class_obj.__dict__.iteritems()
+ if should_doc(name, obj))
+
+ for m in methods_to_doc:
+ content.extend(md_function(m, class_obj=class_obj))
+
+ return content
+
+
+def md_docstring(docstring):
+ """Write a markdown-formatted docstring.
+
+ Returns:
+ A list of markdown-formatted lines.
+ """
+ content = []
+ lines = textwrap.dedent(docstring).splitlines()
+ content.append(md_escape(lines[0]))
+ lines = lines[1:]
+ while lines and (not lines[0] or lines[0].isspace()):
+ lines = lines[1:]
+
+ if not all(l.isspace() for l in lines):
+ content.append(md_code('\n'.join(lines), language=None))
+ content.append('')
+ return content
+
+
+def md_function(func_obj, class_obj=None):
+ """Write markdown documentation for a function.
+
+ Args:
+ func_obj: a types.FunctionType object for the function that should be
+ documented.
+ Returns:
+ A list of markdown-formatted lines.
+ """
+ if class_obj:
+ heading_text = '%s.%s' % (class_obj.__name__, func_obj.__name__)
+ else:
+ heading_text = func_obj.__name__
+ content = [md_heading(md_escape(heading_text), level=3)]
+ content.append('')
+
+ if func_obj.__doc__:
+ content.extend(md_docstring(func_obj.__doc__))
+
+ return content
+
+
+def main(raw_args):
+ """Write markdown documentation for the module at the provided path.
+
+ Args:
+ raw_args: the raw command-line args. Usually sys.argv[1:].
+ Returns:
+ An integer exit code. 0 for success, non-zero for failure.
+ """
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--module-link')
+ parser.add_argument('module_path', type=os.path.realpath)
+ args = parser.parse_args(raw_args)
+
+ return md_module(
+ load_module_from_path(args.module_path),
+ module_link=args.module_link)
+
+
+if __name__ == '__main__':
+ sys.exit(main(sys.argv[1:]))
+
diff --git a/third_party/catapult/devil/devil/utils/markdown_test.py b/third_party/catapult/devil/devil/utils/markdown_test.py
new file mode 100755
index 0000000000..323776ca1a
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/markdown_test.py
@@ -0,0 +1,121 @@
+#! /usr/bin/env python
+# Copyright 2017 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import os
+import sys
+import textwrap
+import unittest
+
+if __name__ == '__main__':
+ sys.path.append(
+ os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
+
+from devil.utils import markdown
+
+
+class MarkdownTest(unittest.TestCase):
+
+ def testBold(self):
+ raw = 'foo'
+ self.assertEquals('**foo**', markdown.md_bold(raw))
+
+ def testBoldContainsStars(self):
+ raw = '*foo*'
+ self.assertEquals('**\\*foo\\***', markdown.md_bold(raw))
+
+ def testCode(self):
+ raw = textwrap.dedent("""\
+ class MarkdownTest(unittest.TestCase):
+ def testCode(self):
+ pass""")
+
+ expected = textwrap.dedent("""\
+ ```python
+ class MarkdownTest(unittest.TestCase):
+ def testCode(self):
+ pass
+ ```
+ """)
+ actual = markdown.md_code(raw, language='python')
+ self.assertEquals(expected, actual)
+
+ def testCodeContainsTicks(self):
+ raw = textwrap.dedent("""\
+ This is sample markdown.
+ ```c
+ // This is a sample code block.
+ int main(int argc, char** argv) {
+ return 0;
+ }
+ ```""")
+
+ expected = textwrap.dedent("""\
+ ```
+ This is sample markdown.
+ \\`\\`\\`c
+ // This is a sample code block.
+ int main(int argc, char** argv) {
+ return 0;
+ }
+ \\`\\`\\`
+ ```
+ """)
+ actual = markdown.md_code(raw, language=None)
+ self.assertEquals(expected, actual)
+
+ def testEscape(self):
+ raw = 'text_with_underscores *and stars*'
+ expected = 'text\\_with\\_underscores \\*and stars\\*'
+ actual = markdown.md_escape(raw)
+ self.assertEquals(expected, actual)
+
+ def testHeading1(self):
+ raw = 'Heading 1'
+ self.assertEquals('# Heading 1', markdown.md_heading(raw, level=1))
+
+ def testHeading5(self):
+ raw = 'Heading 5'
+ self.assertEquals('##### Heading 5', markdown.md_heading(raw, level=5))
+
+ def testHeading10(self):
+ raw = 'Heading 10'
+ self.assertEquals('###### Heading 10', markdown.md_heading(raw, level=10))
+
+ def testInlineCode(self):
+ raw = 'devil.utils.markdown_test'
+ self.assertEquals(
+ '`devil.utils.markdown_test`', markdown.md_inline_code(raw))
+
+ def testInlineCodeContainsTicks(self):
+ raw = 'this contains `backticks`'
+ self.assertEquals(
+ '`this contains \\`backticks\\``', markdown.md_inline_code(raw))
+
+ def testItalic(self):
+ raw = 'bar'
+ self.assertEquals('*bar*', markdown.md_italic(raw))
+
+ def testItalicContainsStars(self):
+ raw = '*bar*'
+ self.assertEquals('*\\*bar\\**', markdown.md_italic(raw))
+
+ def testLink(self):
+ link_text = 'Devil home'
+ link_target = (
+ 'https://github.com/catapult-project/catapult/tree/master/devil')
+ expected = (
+ '[Devil home]'
+ '(https://github.com/catapult-project/catapult/tree/master/devil)')
+ self.assertEquals(expected, markdown.md_link(link_text, link_target))
+
+ def testLinkTextContainsBracket(self):
+ link_text = 'foo [] bar'
+ link_target = 'https://www.google.com'
+ expected = '[foo [\\] bar](https://www.google.com)'
+ self.assertEquals(expected, markdown.md_link(link_text, link_target))
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/third_party/catapult/devil/devil/utils/mock_calls.py b/third_party/catapult/devil/devil/utils/mock_calls.py
new file mode 100644
index 0000000000..5ae951e37d
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/mock_calls.py
@@ -0,0 +1,180 @@
+# Copyright 2014 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""
+A test facility to assert call sequences while mocking their behavior.
+"""
+
+import unittest
+
+from devil import devil_env
+
+with devil_env.SysPath(devil_env.PYMOCK_PATH):
+ import mock # pylint: disable=import-error
+
+
+class TestCase(unittest.TestCase):
+ """Adds assertCalls to TestCase objects."""
+ class _AssertCalls(object):
+
+ def __init__(self, test_case, expected_calls, watched):
+ def call_action(pair):
+ if isinstance(pair, type(mock.call)):
+ return (pair, None)
+ else:
+ return pair
+
+ def do_check(call):
+ def side_effect(*args, **kwargs):
+ received_call = call(*args, **kwargs)
+ self._test_case.assertTrue(
+ self._expected_calls,
+ msg=('Unexpected call: %s' % str(received_call)))
+ expected_call, action = self._expected_calls.pop(0)
+ self._test_case.assertTrue(
+ received_call == expected_call,
+ msg=('Expected call mismatch:\n'
+ ' expected: %s\n'
+ ' received: %s\n'
+ % (str(expected_call), str(received_call))))
+ if callable(action):
+ return action(*args, **kwargs)
+ else:
+ return action
+ return side_effect
+
+ self._test_case = test_case
+ self._expected_calls = [call_action(pair) for pair in expected_calls]
+ watched = watched.copy() # do not pollute the caller's dict
+ watched.update((call.parent.name, call.parent)
+ for call, _ in self._expected_calls)
+ self._patched = [test_case.patch_call(call, side_effect=do_check(call))
+ for call in watched.itervalues()]
+
+ def __enter__(self):
+ for patch in self._patched:
+ patch.__enter__()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ for patch in self._patched:
+ patch.__exit__(exc_type, exc_val, exc_tb)
+ if exc_type is None:
+ missing = ''.join(' expected: %s\n' % str(call)
+ for call, _ in self._expected_calls)
+ self._test_case.assertFalse(
+ missing,
+ msg='Expected calls not found:\n' + missing)
+
+ def __init__(self, *args, **kwargs):
+ super(TestCase, self).__init__(*args, **kwargs)
+ self.call = mock.call.self
+ self._watched = {}
+
+ def call_target(self, call):
+ """Resolve a self.call instance to the target it represents.
+
+ Args:
+ call: a self.call instance, e.g. self.call.adb.Shell
+
+ Returns:
+ The target object represented by the call, e.g. self.adb.Shell
+
+ Raises:
+ ValueError if the path of the call does not start with "self", i.e. the
+ target of the call is external to the self object.
+ AttributeError if the path of the call does not specify a valid
+ chain of attributes (without any calls) starting from "self".
+ """
+ path = call.name.split('.')
+ if path.pop(0) != 'self':
+ raise ValueError("Target %r outside of 'self' object" % call.name)
+ target = self
+ for attr in path:
+ target = getattr(target, attr)
+ return target
+
+ def patch_call(self, call, **kwargs):
+ """Patch the target of a mock.call instance.
+
+ Args:
+ call: a mock.call instance identifying a target to patch
+ Extra keyword arguments are processed by mock.patch
+
+ Returns:
+ A context manager to mock/unmock the target of the call
+ """
+ if call.name.startswith('self.'):
+ target = self.call_target(call.parent)
+ _, attribute = call.name.rsplit('.', 1)
+ if (hasattr(type(target), attribute)
+ and isinstance(getattr(type(target), attribute), property)):
+ return mock.patch.object(
+ type(target), attribute, new_callable=mock.PropertyMock, **kwargs)
+ else:
+ return mock.patch.object(target, attribute, **kwargs)
+ else:
+ return mock.patch(call.name, **kwargs)
+
+ def watchCalls(self, calls):
+ """Add calls to the set of watched calls.
+
+ Args:
+ calls: a sequence of mock.call instances identifying targets to watch
+ """
+ self._watched.update((call.name, call) for call in calls)
+
+ def watchMethodCalls(self, call, ignore=None):
+ """Watch all public methods of the target identified by a self.call.
+
+ Args:
+ call: a self.call instance indetifying an object
+ ignore: a list of public methods to ignore when watching for calls
+ """
+ target = self.call_target(call)
+ if ignore is None:
+ ignore = []
+ self.watchCalls(getattr(call, method)
+ for method in dir(target.__class__)
+ if not method.startswith('_') and not method in ignore)
+
+ def clearWatched(self):
+ """Clear the set of watched calls."""
+ self._watched = {}
+
+ def assertCalls(self, *calls):
+ """A context manager to assert that a sequence of calls is made.
+
+ During the assertion, a number of functions and methods will be "watched",
+ and any calls made to them is expected to appear---in the exact same order,
+ and with the exact same arguments---as specified by the argument |calls|.
+
+ By default, the targets of all expected calls are watched. Further targets
+ to watch may be added using watchCalls and watchMethodCalls.
+
+ Optionaly, each call may be accompanied by an action. If the action is a
+ (non-callable) value, this value will be used as the return value given to
+ the caller when the matching call is found. Alternatively, if the action is
+ a callable, the action will be then called with the same arguments as the
+ intercepted call, so that it can provide a return value or perform other
+ side effects. If the action is missing, a return value of None is assumed.
+
+ Note that mock.Mock objects are often convenient to use as a callable
+ action, e.g. to raise exceptions or return other objects which are
+ themselves callable.
+
+ Args:
+ calls: each argument is either a pair (expected_call, action) or just an
+ expected_call, where expected_call is a mock.call instance.
+
+ Raises:
+ AssertionError if the watched targets do not receive the exact sequence
+ of calls specified. Missing calls, extra calls, and calls with
+ mismatching arguments, all cause the assertion to fail.
+ """
+ return self._AssertCalls(self, calls, self._watched)
+
+ def assertCall(self, call, action=None):
+ return self.assertCalls((call, action))
+
diff --git a/third_party/catapult/devil/devil/utils/mock_calls_test.py b/third_party/catapult/devil/devil/utils/mock_calls_test.py
new file mode 100755
index 0000000000..8eb4fc9da4
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/mock_calls_test.py
@@ -0,0 +1,173 @@
+#!/usr/bin/env python
+# Copyright 2014 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""
+Unit tests for the contents of mock_calls.py.
+"""
+
+import logging
+import os
+import unittest
+
+from devil import devil_env
+from devil.android.sdk import version_codes
+from devil.utils import mock_calls
+
+with devil_env.SysPath(devil_env.PYMOCK_PATH):
+ import mock # pylint: disable=import-error
+
+
+class _DummyAdb(object):
+
+ def __str__(self):
+ return '0123456789abcdef'
+
+ def Push(self, host_path, device_path):
+ logging.debug('(device %s) pushing %r to %r', self, host_path, device_path)
+
+ def IsOnline(self):
+ logging.debug('(device %s) checking device online', self)
+ return True
+
+ def Shell(self, cmd):
+ logging.debug('(device %s) running command %r', self, cmd)
+ return "nice output\n"
+
+ def Reboot(self):
+ logging.debug('(device %s) rebooted!', self)
+
+ @property
+ def build_version_sdk(self):
+ logging.debug('(device %s) getting build_version_sdk', self)
+ return version_codes.LOLLIPOP
+
+
+class TestCaseWithAssertCallsTest(mock_calls.TestCase):
+
+ def setUp(self):
+ self.adb = _DummyAdb()
+
+ def ShellError(self):
+ def action(cmd):
+ raise ValueError('(device %s) command %r is not nice' % (self.adb, cmd))
+ return action
+
+ def get_answer(self):
+ logging.debug("called 'get_answer' of %r object", self)
+ return 42
+
+ def echo(self, thing):
+ logging.debug("called 'echo' of %r object", self)
+ return thing
+
+ def testCallTarget_succeds(self):
+ self.assertEquals(self.adb.Shell,
+ self.call_target(self.call.adb.Shell))
+
+ def testCallTarget_failsExternal(self):
+ with self.assertRaises(ValueError):
+ self.call_target(mock.call.sys.getcwd)
+
+ def testCallTarget_failsUnknownAttribute(self):
+ with self.assertRaises(AttributeError):
+ self.call_target(self.call.adb.Run)
+
+ def testCallTarget_failsIntermediateCalls(self):
+ with self.assertRaises(AttributeError):
+ self.call_target(self.call.adb.RunShell('cmd').append)
+
+ def testPatchCall_method(self):
+ self.assertEquals(42, self.get_answer())
+ with self.patch_call(self.call.get_answer, return_value=123):
+ self.assertEquals(123, self.get_answer())
+ self.assertEquals(42, self.get_answer())
+
+ def testPatchCall_attribute_method(self):
+ with self.patch_call(self.call.adb.Shell, return_value='hello'):
+ self.assertEquals('hello', self.adb.Shell('echo hello'))
+
+ def testPatchCall_global(self):
+ with self.patch_call(mock.call.os.getcwd, return_value='/some/path'):
+ self.assertEquals('/some/path', os.getcwd())
+
+ def testPatchCall_withSideEffect(self):
+ with self.patch_call(self.call.adb.Shell, side_effect=ValueError):
+ with self.assertRaises(ValueError):
+ self.adb.Shell('echo hello')
+
+ def testPatchCall_property(self):
+ self.assertEquals(version_codes.LOLLIPOP, self.adb.build_version_sdk)
+ with self.patch_call(
+ self.call.adb.build_version_sdk,
+ return_value=version_codes.KITKAT):
+ self.assertEquals(version_codes.KITKAT, self.adb.build_version_sdk)
+ self.assertEquals(version_codes.LOLLIPOP, self.adb.build_version_sdk)
+
+ def testAssertCalls_succeeds_simple(self):
+ self.assertEquals(42, self.get_answer())
+ with self.assertCall(self.call.get_answer(), 123):
+ self.assertEquals(123, self.get_answer())
+ self.assertEquals(42, self.get_answer())
+
+ def testAssertCalls_succeeds_multiple(self):
+ with self.assertCalls(
+ (mock.call.os.getcwd(), '/some/path'),
+ (self.call.echo('hello'), 'hello'),
+ (self.call.get_answer(), 11),
+ self.call.adb.Push('this_file', 'that_file'),
+ (self.call.get_answer(), 12)):
+ self.assertEquals(os.getcwd(), '/some/path')
+ self.assertEquals('hello', self.echo('hello'))
+ self.assertEquals(11, self.get_answer())
+ self.adb.Push('this_file', 'that_file')
+ self.assertEquals(12, self.get_answer())
+
+ def testAsserCalls_succeeds_withAction(self):
+ with self.assertCall(
+ self.call.adb.Shell('echo hello'), self.ShellError()):
+ with self.assertRaises(ValueError):
+ self.adb.Shell('echo hello')
+
+ def testAssertCalls_fails_tooManyCalls(self):
+ with self.assertRaises(AssertionError):
+ with self.assertCalls(self.call.adb.IsOnline()):
+ self.adb.IsOnline()
+ self.adb.IsOnline()
+
+ def testAssertCalls_fails_tooFewCalls(self):
+ with self.assertRaises(AssertionError):
+ with self.assertCalls(self.call.adb.IsOnline()):
+ pass
+
+ def testAssertCalls_succeeds_extraCalls(self):
+ # we are not watching Reboot, so the assertion succeeds
+ with self.assertCalls(self.call.adb.IsOnline()):
+ self.adb.IsOnline()
+ self.adb.Reboot()
+
+ def testAssertCalls_fails_extraCalls(self):
+ self.watchCalls([self.call.adb.Reboot])
+ # this time we are also watching Reboot, so the assertion fails
+ with self.assertRaises(AssertionError):
+ with self.assertCalls(self.call.adb.IsOnline()):
+ self.adb.IsOnline()
+ self.adb.Reboot()
+
+ def testAssertCalls_succeeds_NoCalls(self):
+ self.watchMethodCalls(self.call.adb) # we are watching all adb methods
+ with self.assertCalls():
+ pass
+
+ def testAssertCalls_fails_NoCalls(self):
+ self.watchMethodCalls(self.call.adb)
+ with self.assertRaises(AssertionError):
+ with self.assertCalls():
+ self.adb.IsOnline()
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.DEBUG)
+ unittest.main(verbosity=2)
+
diff --git a/third_party/catapult/devil/devil/utils/parallelizer.py b/third_party/catapult/devil/devil/utils/parallelizer.py
new file mode 100644
index 0000000000..35995251c8
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/parallelizer.py
@@ -0,0 +1,238 @@
+# Copyright 2014 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+""" Wrapper that allows method execution in parallel.
+
+This class wraps a list of objects of the same type, emulates their
+interface, and executes any functions called on the objects in parallel
+in ReraiserThreads.
+
+This means that, given a list of objects:
+
+ class Foo:
+ def __init__(self):
+ self.baz = Baz()
+
+ def bar(self, my_param):
+ // do something
+
+ list_of_foos = [Foo(1), Foo(2), Foo(3)]
+
+we can take a sequential operation on that list of objects:
+
+ for f in list_of_foos:
+ f.bar('Hello')
+
+and run it in parallel across all of the objects:
+
+ Parallelizer(list_of_foos).bar('Hello')
+
+It can also handle (non-method) attributes of objects, so that this:
+
+ for f in list_of_foos:
+ f.baz.myBazMethod()
+
+can be run in parallel with:
+
+ Parallelizer(list_of_foos).baz.myBazMethod()
+
+Because it emulates the interface of the wrapped objects, a Parallelizer
+can be passed to a method or function that takes objects of that type:
+
+ def DoesSomethingWithFoo(the_foo):
+ the_foo.bar('Hello')
+ the_foo.bar('world')
+ the_foo.baz.myBazMethod
+
+ DoesSomethingWithFoo(Parallelizer(list_of_foos))
+
+Note that this class spins up a thread for each object. Using this class
+to parallelize operations that are already fast will incur a net performance
+penalty.
+
+"""
+# pylint: disable=protected-access
+
+from devil.utils import reraiser_thread
+from devil.utils import watchdog_timer
+
+_DEFAULT_TIMEOUT = 30
+_DEFAULT_RETRIES = 3
+
+
+class Parallelizer(object):
+ """Allows parallel execution of method calls across a group of objects."""
+
+ def __init__(self, objs):
+ self._orig_objs = objs
+ self._objs = objs
+
+ def __getattr__(self, name):
+ """Emulate getting the |name| attribute of |self|.
+
+ Args:
+ name: The name of the attribute to retrieve.
+ Returns:
+ A Parallelizer emulating the |name| attribute of |self|.
+ """
+ self.pGet(None)
+
+ r = type(self)(self._orig_objs)
+ r._objs = [getattr(o, name) for o in self._objs]
+ return r
+
+ def __getitem__(self, index):
+ """Emulate getting the value of |self| at |index|.
+
+ Returns:
+ A Parallelizer emulating the value of |self| at |index|.
+ """
+ self.pGet(None)
+
+ r = type(self)(self._orig_objs)
+ r._objs = [o[index] for o in self._objs]
+ return r
+
+ def __call__(self, *args, **kwargs):
+ """Emulate calling |self| with |args| and |kwargs|.
+
+ Note that this call is asynchronous. Call pFinish on the return value to
+ block until the call finishes.
+
+ Returns:
+ A Parallelizer wrapping the ReraiserThreadGroup running the call in
+ parallel.
+ Raises:
+ AttributeError if the wrapped objects aren't callable.
+ """
+ self.pGet(None)
+
+ for o in self._objs:
+ if not callable(o):
+ raise AttributeError("'%s' is not callable" % o.__name__)
+
+ r = type(self)(self._orig_objs)
+ r._objs = reraiser_thread.ReraiserThreadGroup(
+ [reraiser_thread.ReraiserThread(
+ o, args=args, kwargs=kwargs,
+ name='%s.%s' % (str(d), o.__name__))
+ for d, o in zip(self._orig_objs, self._objs)])
+ r._objs.StartAll() # pylint: disable=W0212
+ return r
+
+ def pFinish(self, timeout):
+ """Finish any outstanding asynchronous operations.
+
+ Args:
+ timeout: The maximum number of seconds to wait for an individual
+ result to return, or None to wait forever.
+ Returns:
+ self, now emulating the return values.
+ """
+ self._assertNoShadow('pFinish')
+ if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup):
+ self._objs.JoinAll()
+ self._objs = self._objs.GetAllReturnValues(
+ watchdog_timer.WatchdogTimer(timeout))
+ return self
+
+ def pGet(self, timeout):
+ """Get the current wrapped objects.
+
+ Args:
+ timeout: Same as |pFinish|.
+ Returns:
+ A list of the results, in order of the provided devices.
+ Raises:
+ Any exception raised by any of the called functions.
+ """
+ self._assertNoShadow('pGet')
+ self.pFinish(timeout)
+ return self._objs
+
+ def pMap(self, f, *args, **kwargs):
+ """Map a function across the current wrapped objects in parallel.
+
+ This calls f(o, *args, **kwargs) for each o in the set of wrapped objects.
+
+ Note that this call is asynchronous. Call pFinish on the return value to
+ block until the call finishes.
+
+ Args:
+ f: The function to call.
+ args: The positional args to pass to f.
+ kwargs: The keyword args to pass to f.
+ Returns:
+ A Parallelizer wrapping the ReraiserThreadGroup running the map in
+ parallel.
+ """
+ self._assertNoShadow('pMap')
+ r = type(self)(self._orig_objs)
+ r._objs = reraiser_thread.ReraiserThreadGroup(
+ [reraiser_thread.ReraiserThread(
+ f, args=tuple([o] + list(args)), kwargs=kwargs,
+ name='%s(%s)' % (f.__name__, d))
+ for d, o in zip(self._orig_objs, self._objs)])
+ r._objs.StartAll() # pylint: disable=W0212
+ return r
+
+ def _assertNoShadow(self, attr_name):
+ """Ensures that |attr_name| isn't shadowing part of the wrapped obejcts.
+
+ If the wrapped objects _do_ have an |attr_name| attribute, it will be
+ inaccessible to clients.
+
+ Args:
+ attr_name: The attribute to check.
+ Raises:
+ AssertionError if the wrapped objects have an attribute named 'attr_name'
+ or '_assertNoShadow'.
+ """
+ if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup):
+ assert not hasattr(self._objs, '_assertNoShadow')
+ assert not hasattr(self._objs, attr_name)
+ else:
+ assert not any(hasattr(o, '_assertNoShadow') for o in self._objs)
+ assert not any(hasattr(o, attr_name) for o in self._objs)
+
+
+class SyncParallelizer(Parallelizer):
+ """A Parallelizer that blocks on function calls."""
+
+ # override
+ def __call__(self, *args, **kwargs):
+ """Emulate calling |self| with |args| and |kwargs|.
+
+ Note that this call is synchronous.
+
+ Returns:
+ A Parallelizer emulating the value returned from calling |self| with
+ |args| and |kwargs|.
+ Raises:
+ AttributeError if the wrapped objects aren't callable.
+ """
+ r = super(SyncParallelizer, self).__call__(*args, **kwargs)
+ r.pFinish(None)
+ return r
+
+ # override
+ def pMap(self, f, *args, **kwargs):
+ """Map a function across the current wrapped objects in parallel.
+
+ This calls f(o, *args, **kwargs) for each o in the set of wrapped objects.
+
+ Note that this call is synchronous.
+
+ Args:
+ f: The function to call.
+ args: The positional args to pass to f.
+ kwargs: The keyword args to pass to f.
+ Returns:
+ A Parallelizer wrapping the ReraiserThreadGroup running the map in
+ parallel.
+ """
+ r = super(SyncParallelizer, self).pMap(f, *args, **kwargs)
+ r.pFinish(None)
+ return r
+
diff --git a/third_party/catapult/devil/devil/utils/parallelizer_test.py b/third_party/catapult/devil/devil/utils/parallelizer_test.py
new file mode 100644
index 0000000000..32ff7ec547
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/parallelizer_test.py
@@ -0,0 +1,162 @@
+# Copyright 2014 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Unit tests for the contents of parallelizer.py."""
+
+# pylint: disable=W0212
+# pylint: disable=W0613
+
+import os
+import tempfile
+import time
+import unittest
+
+from devil.utils import parallelizer
+
+
+class ParallelizerTestObject(object):
+ """Class used to test parallelizer.Parallelizer."""
+
+ parallel = parallelizer.Parallelizer
+
+ def __init__(self, thing, completion_file_name=None):
+ self._thing = thing
+ self._completion_file_name = completion_file_name
+ self.helper = ParallelizerTestObjectHelper(thing)
+
+ @staticmethod
+ def doReturn(what):
+ return what
+
+ @classmethod
+ def doRaise(cls, what):
+ raise what
+
+ def doSetTheThing(self, new_thing):
+ self._thing = new_thing
+
+ def doReturnTheThing(self):
+ return self._thing
+
+ def doRaiseTheThing(self):
+ raise self._thing
+
+ def doRaiseIfExceptionElseSleepFor(self, sleep_duration):
+ if isinstance(self._thing, Exception):
+ raise self._thing
+ time.sleep(sleep_duration)
+ self._write_completion_file()
+ return self._thing
+
+ def _write_completion_file(self):
+ if self._completion_file_name and len(self._completion_file_name):
+ with open(self._completion_file_name, 'w+b') as completion_file:
+ completion_file.write('complete')
+
+ def __getitem__(self, index):
+ return self._thing[index]
+
+ def __str__(self):
+ return type(self).__name__
+
+
+class ParallelizerTestObjectHelper(object):
+
+ def __init__(self, thing):
+ self._thing = thing
+
+ def doReturnStringThing(self):
+ return str(self._thing)
+
+
+class ParallelizerTest(unittest.TestCase):
+
+ def testInitEmptyList(self):
+ r = parallelizer.Parallelizer([]).replace('a', 'b').pGet(0.1)
+ self.assertEquals([], r)
+
+ def testMethodCall(self):
+ test_data = ['abc_foo', 'def_foo', 'ghi_foo']
+ expected = ['abc_bar', 'def_bar', 'ghi_bar']
+ r = parallelizer.Parallelizer(test_data).replace('_foo', '_bar').pGet(0.1)
+ self.assertEquals(expected, r)
+
+ def testMutate(self):
+ devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
+ self.assertTrue(all(d.doReturnTheThing() for d in devices))
+ ParallelizerTestObject.parallel(devices).doSetTheThing(False).pFinish(1)
+ self.assertTrue(not any(d.doReturnTheThing() for d in devices))
+
+ def testAllReturn(self):
+ devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
+ results = ParallelizerTestObject.parallel(
+ devices).doReturnTheThing().pGet(1)
+ self.assertTrue(isinstance(results, list))
+ self.assertEquals(10, len(results))
+ self.assertTrue(all(results))
+
+ def testAllRaise(self):
+ devices = [ParallelizerTestObject(Exception('thing %d' % i))
+ for i in xrange(0, 10)]
+ p = ParallelizerTestObject.parallel(devices).doRaiseTheThing()
+ with self.assertRaises(Exception):
+ p.pGet(1)
+
+ def testOneFailOthersComplete(self):
+ parallel_device_count = 10
+ exception_index = 7
+ exception_msg = 'thing %d' % exception_index
+
+ try:
+ completion_files = [tempfile.NamedTemporaryFile(delete=False)
+ for _ in xrange(0, parallel_device_count)]
+ devices = [
+ ParallelizerTestObject(
+ i if i != exception_index else Exception(exception_msg),
+ completion_files[i].name)
+ for i in xrange(0, parallel_device_count)]
+ for f in completion_files:
+ f.close()
+ p = ParallelizerTestObject.parallel(devices)
+ with self.assertRaises(Exception) as e:
+ p.doRaiseIfExceptionElseSleepFor(2).pGet(3)
+ self.assertTrue(exception_msg in str(e.exception))
+ for i in xrange(0, parallel_device_count):
+ with open(completion_files[i].name) as f:
+ if i == exception_index:
+ self.assertEquals('', f.read())
+ else:
+ self.assertEquals('complete', f.read())
+ finally:
+ for f in completion_files:
+ os.remove(f.name)
+
+ def testReusable(self):
+ devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
+ p = ParallelizerTestObject.parallel(devices)
+ results = p.doReturn(True).pGet(1)
+ self.assertTrue(all(results))
+ results = p.doReturn(True).pGet(1)
+ self.assertTrue(all(results))
+ with self.assertRaises(Exception):
+ results = p.doRaise(Exception('reusableTest')).pGet(1)
+
+ def testContained(self):
+ devices = [ParallelizerTestObject(i) for i in xrange(0, 10)]
+ results = (ParallelizerTestObject.parallel(devices).helper
+ .doReturnStringThing().pGet(1))
+ self.assertTrue(isinstance(results, list))
+ self.assertEquals(10, len(results))
+ for i in xrange(0, 10):
+ self.assertEquals(str(i), results[i])
+
+ def testGetItem(self):
+ devices = [ParallelizerTestObject(range(i, i + 10)) for i in xrange(0, 10)]
+ results = ParallelizerTestObject.parallel(devices)[9].pGet(1)
+ self.assertEquals(range(9, 19), results)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
+
diff --git a/third_party/catapult/devil/devil/utils/reraiser_thread.py b/third_party/catapult/devil/devil/utils/reraiser_thread.py
new file mode 100644
index 0000000000..56d95f3937
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/reraiser_thread.py
@@ -0,0 +1,228 @@
+# Copyright 2013 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Thread and ThreadGroup that reraise exceptions on the main thread."""
+# pylint: disable=W0212
+
+import logging
+import sys
+import threading
+import time
+import traceback
+
+from devil.utils import watchdog_timer
+
+
+class TimeoutError(Exception):
+ """Module-specific timeout exception."""
+ pass
+
+
+def LogThreadStack(thread, error_log_func=logging.critical):
+ """Log the stack for the given thread.
+
+ Args:
+ thread: a threading.Thread instance.
+ error_log_func: Logging function when logging errors.
+ """
+ stack = sys._current_frames()[thread.ident]
+ error_log_func('*' * 80)
+ error_log_func('Stack dump for thread %r', thread.name)
+ error_log_func('*' * 80)
+ for filename, lineno, name, line in traceback.extract_stack(stack):
+ error_log_func('File: "%s", line %d, in %s', filename, lineno, name)
+ if line:
+ error_log_func(' %s', line.strip())
+ error_log_func('*' * 80)
+
+
+class ReraiserThread(threading.Thread):
+ """Thread class that can reraise exceptions."""
+
+ def __init__(self, func, args=None, kwargs=None, name=None):
+ """Initialize thread.
+
+ Args:
+ func: callable to call on a new thread.
+ args: list of positional arguments for callable, defaults to empty.
+ kwargs: dictionary of keyword arguments for callable, defaults to empty.
+ name: thread name, defaults to Thread-N.
+ """
+ if not name and func.__name__ != '<lambda>':
+ name = func.__name__
+ super(ReraiserThread, self).__init__(name=name)
+ if not args:
+ args = []
+ if not kwargs:
+ kwargs = {}
+ self.daemon = True
+ self._func = func
+ self._args = args
+ self._kwargs = kwargs
+ self._ret = None
+ self._exc_info = None
+ self._thread_group = None
+
+ def ReraiseIfException(self):
+ """Reraise exception if an exception was raised in the thread."""
+ if self._exc_info:
+ raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
+
+ def GetReturnValue(self):
+ """Reraise exception if present, otherwise get the return value."""
+ self.ReraiseIfException()
+ return self._ret
+
+ # override
+ def run(self):
+ """Overrides Thread.run() to add support for reraising exceptions."""
+ try:
+ self._ret = self._func(*self._args, **self._kwargs)
+ except: # pylint: disable=W0702
+ self._exc_info = sys.exc_info()
+
+
+class ReraiserThreadGroup(object):
+ """A group of ReraiserThread objects."""
+
+ def __init__(self, threads=None):
+ """Initialize thread group.
+
+ Args:
+ threads: a list of ReraiserThread objects; defaults to empty.
+ """
+ self._threads = []
+ # Set when a thread from one group has called JoinAll on another. It is used
+ # to detect when a there is a TimeoutRetryThread active that links to the
+ # current thread.
+ self.blocked_parent_thread_group = None
+ if threads:
+ for thread in threads:
+ self.Add(thread)
+
+ def Add(self, thread):
+ """Add a thread to the group.
+
+ Args:
+ thread: a ReraiserThread object.
+ """
+ assert thread._thread_group is None
+ thread._thread_group = self
+ self._threads.append(thread)
+
+ def StartAll(self, will_block=False):
+ """Start all threads.
+
+ Args:
+ will_block: Whether the calling thread will subsequently block on this
+ thread group. Causes the active ReraiserThreadGroup (if there is one)
+ to be marked as blocking on this thread group.
+ """
+ if will_block:
+ # Multiple threads blocking on the same outer thread should not happen in
+ # practice.
+ assert not self.blocked_parent_thread_group
+ self.blocked_parent_thread_group = CurrentThreadGroup()
+ for thread in self._threads:
+ thread.start()
+
+ def _JoinAll(self, watcher=None, timeout=None):
+ """Join all threads without stack dumps.
+
+ Reraises exceptions raised by the child threads and supports breaking
+ immediately on exceptions raised on the main thread.
+
+ Args:
+ watcher: Watchdog object providing the thread timeout. If none is
+ provided, the thread will never be timed out.
+ timeout: An optional number of seconds to wait before timing out the join
+ operation. This will not time out the threads.
+ """
+ if watcher is None:
+ watcher = watchdog_timer.WatchdogTimer(None)
+ alive_threads = self._threads[:]
+ end_time = (time.time() + timeout) if timeout else None
+ try:
+ while alive_threads and (end_time is None or end_time > time.time()):
+ for thread in alive_threads[:]:
+ if watcher.IsTimedOut():
+ raise TimeoutError('Timed out waiting for %d of %d threads.' %
+ (len(alive_threads), len(self._threads)))
+ # Allow the main thread to periodically check for interrupts.
+ thread.join(0.1)
+ if not thread.isAlive():
+ alive_threads.remove(thread)
+ # All threads are allowed to complete before reraising exceptions.
+ for thread in self._threads:
+ thread.ReraiseIfException()
+ finally:
+ self.blocked_parent_thread_group = None
+
+ def IsAlive(self):
+ """Check whether any of the threads are still alive.
+
+ Returns:
+ Whether any of the threads are still alive.
+ """
+ return any(t.isAlive() for t in self._threads)
+
+ def JoinAll(self, watcher=None, timeout=None,
+ error_log_func=logging.critical):
+ """Join all threads.
+
+ Reraises exceptions raised by the child threads and supports breaking
+ immediately on exceptions raised on the main thread. Unfinished threads'
+ stacks will be logged on watchdog timeout.
+
+ Args:
+ watcher: Watchdog object providing the thread timeout. If none is
+ provided, the thread will never be timed out.
+ timeout: An optional number of seconds to wait before timing out the join
+ operation. This will not time out the threads.
+ error_log_func: Logging function when logging errors.
+ """
+ try:
+ self._JoinAll(watcher, timeout)
+ except TimeoutError:
+ error_log_func('Timed out. Dumping threads.')
+ for thread in (t for t in self._threads if t.isAlive()):
+ LogThreadStack(thread, error_log_func=error_log_func)
+ raise
+
+ def GetAllReturnValues(self, watcher=None):
+ """Get all return values, joining all threads if necessary.
+
+ Args:
+ watcher: same as in |JoinAll|. Only used if threads are alive.
+ """
+ if any([t.isAlive() for t in self._threads]):
+ self.JoinAll(watcher)
+ return [t.GetReturnValue() for t in self._threads]
+
+
+def CurrentThreadGroup():
+ """Returns the ReraiserThreadGroup that owns the running thread.
+
+ Returns:
+ The current thread group, otherwise None.
+ """
+ current_thread = threading.current_thread()
+ if isinstance(current_thread, ReraiserThread):
+ return current_thread._thread_group # pylint: disable=no-member
+ return None
+
+
+def RunAsync(funcs, watcher=None):
+ """Executes the given functions in parallel and returns their results.
+
+ Args:
+ funcs: List of functions to perform on their own threads.
+ watcher: Watchdog object providing timeout, by default waits forever.
+
+ Returns:
+ A list of return values in the order of the given functions.
+ """
+ thread_group = ReraiserThreadGroup(ReraiserThread(f) for f in funcs)
+ thread_group.StartAll(will_block=True)
+ return thread_group.GetAllReturnValues(watcher=watcher)
diff --git a/third_party/catapult/devil/devil/utils/reraiser_thread_unittest.py b/third_party/catapult/devil/devil/utils/reraiser_thread_unittest.py
new file mode 100644
index 0000000000..e3c4e6bee8
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/reraiser_thread_unittest.py
@@ -0,0 +1,117 @@
+# Copyright 2013 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Unittests for reraiser_thread.py."""
+
+import threading
+import unittest
+
+from devil.utils import reraiser_thread
+from devil.utils import watchdog_timer
+
+
+class TestException(Exception):
+ pass
+
+
+class TestReraiserThread(unittest.TestCase):
+ """Tests for reraiser_thread.ReraiserThread."""
+
+ def testNominal(self):
+ result = [None, None]
+
+ def f(a, b=None):
+ result[0] = a
+ result[1] = b
+
+ thread = reraiser_thread.ReraiserThread(f, [1], {'b': 2})
+ thread.start()
+ thread.join()
+ self.assertEqual(result[0], 1)
+ self.assertEqual(result[1], 2)
+
+ def testRaise(self):
+ def f():
+ raise TestException
+
+ thread = reraiser_thread.ReraiserThread(f)
+ thread.start()
+ thread.join()
+ with self.assertRaises(TestException):
+ thread.ReraiseIfException()
+
+
+class TestReraiserThreadGroup(unittest.TestCase):
+ """Tests for reraiser_thread.ReraiserThreadGroup."""
+
+ def testInit(self):
+ ran = [False] * 5
+
+ def f(i):
+ ran[i] = True
+
+ group = reraiser_thread.ReraiserThreadGroup(
+ [reraiser_thread.ReraiserThread(f, args=[i]) for i in range(5)])
+ group.StartAll()
+ group.JoinAll()
+ for v in ran:
+ self.assertTrue(v)
+
+ def testAdd(self):
+ ran = [False] * 5
+
+ def f(i):
+ ran[i] = True
+
+ group = reraiser_thread.ReraiserThreadGroup()
+ for i in xrange(5):
+ group.Add(reraiser_thread.ReraiserThread(f, args=[i]))
+ group.StartAll()
+ group.JoinAll()
+ for v in ran:
+ self.assertTrue(v)
+
+ def testJoinRaise(self):
+ def f():
+ raise TestException
+ group = reraiser_thread.ReraiserThreadGroup(
+ [reraiser_thread.ReraiserThread(f) for _ in xrange(5)])
+ group.StartAll()
+ with self.assertRaises(TestException):
+ group.JoinAll()
+
+ def testJoinTimeout(self):
+ def f():
+ pass
+ event = threading.Event()
+
+ def g():
+ event.wait()
+ group = reraiser_thread.ReraiserThreadGroup(
+ [reraiser_thread.ReraiserThread(g),
+ reraiser_thread.ReraiserThread(f)])
+ group.StartAll()
+ with self.assertRaises(reraiser_thread.TimeoutError):
+ group.JoinAll(watchdog_timer.WatchdogTimer(0.01))
+ event.set()
+
+
+class TestRunAsync(unittest.TestCase):
+ """Tests for reraiser_thread.RunAsync."""
+
+ def testNoArgs(self):
+ results = reraiser_thread.RunAsync([])
+ self.assertEqual([], results)
+
+ def testOneArg(self):
+ results = reraiser_thread.RunAsync([lambda: 1])
+ self.assertEqual([1], results)
+
+ def testTwoArgs(self):
+ a, b = reraiser_thread.RunAsync((lambda: 1, lambda: 2))
+ self.assertEqual(1, a)
+ self.assertEqual(2, b)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/third_party/catapult/devil/devil/utils/reset_usb.py b/third_party/catapult/devil/devil/utils/reset_usb.py
new file mode 100755
index 0000000000..0335227dca
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/reset_usb.py
@@ -0,0 +1,111 @@
+#!/usr/bin/env python
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import argparse
+import fcntl
+import logging
+import os
+import re
+import sys
+
+if __name__ == '__main__':
+ sys.path.append(
+ os.path.abspath(os.path.join(os.path.dirname(__file__),
+ '..', '..')))
+
+from devil.android import device_errors
+from devil.utils import lsusb
+from devil.utils import run_tests_helper
+
+logger = logging.getLogger(__name__)
+
+_INDENTATION_RE = re.compile(r'^( *)')
+_LSUSB_BUS_DEVICE_RE = re.compile(r'^Bus (\d{3}) Device (\d{3}):')
+_LSUSB_ENTRY_RE = re.compile(r'^ *([^ ]+) +([^ ]+) *([^ ].*)?$')
+_LSUSB_GROUP_RE = re.compile(r'^ *([^ ]+.*):$')
+
+_USBDEVFS_RESET = ord('U') << 8 | 20
+
+
+def reset_usb(bus, device):
+ """Reset the USB device with the given bus and device."""
+ usb_file_path = '/dev/bus/usb/%03d/%03d' % (bus, device)
+ with open(usb_file_path, 'w') as usb_file:
+ logger.debug('fcntl.ioctl(%s, %d)', usb_file_path, _USBDEVFS_RESET)
+ fcntl.ioctl(usb_file, _USBDEVFS_RESET)
+
+
+def reset_android_usb(serial):
+ """Reset the USB device for the given Android device."""
+ lsusb_info = lsusb.lsusb()
+
+ bus = None
+ device = None
+ for device_info in lsusb_info:
+ device_serial = lsusb.get_lsusb_serial(device_info)
+ if device_serial == serial:
+ bus = int(device_info.get('bus'))
+ device = int(device_info.get('device'))
+
+ if bus and device:
+ reset_usb(bus, device)
+ else:
+ raise device_errors.DeviceUnreachableError(
+ 'Unable to determine bus(%s) or device(%s) for device %s'
+ % (bus, device, serial))
+
+
+def reset_all_android_devices():
+ """Reset all USB devices that look like an Android device."""
+ _reset_all_matching(lambda i: bool(lsusb.get_lsusb_serial(i)))
+
+
+def _reset_all_matching(condition):
+ lsusb_info = lsusb.lsusb()
+ for device_info in lsusb_info:
+ if int(device_info.get('device')) != 1 and condition(device_info):
+ bus = int(device_info.get('bus'))
+ device = int(device_info.get('device'))
+ try:
+ reset_usb(bus, device)
+ serial = lsusb.get_lsusb_serial(device_info)
+ if serial:
+ logger.info(
+ 'Reset USB device (bus: %03d, device: %03d, serial: %s)',
+ bus, device, serial)
+ else:
+ logger.info(
+ 'Reset USB device (bus: %03d, device: %03d)',
+ bus, device)
+ except IOError:
+ logger.error(
+ 'Failed to reset USB device (bus: %03d, device: %03d)',
+ bus, device)
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-v', '--verbose', action='count')
+ parser.add_argument('-s', '--serial')
+ parser.add_argument('--bus', type=int)
+ parser.add_argument('--device', type=int)
+ args = parser.parse_args()
+
+ run_tests_helper.SetLogLevel(args.verbose)
+
+ if args.serial:
+ reset_android_usb(args.serial)
+ elif args.bus and args.device:
+ reset_usb(args.bus, args.device)
+ else:
+ parser.error('Unable to determine target. '
+ 'Specify --serial or BOTH --bus and --device.')
+
+ return 0
+
+
+if __name__ == '__main__':
+ sys.exit(main())
+
diff --git a/third_party/catapult/devil/devil/utils/run_tests_helper.py b/third_party/catapult/devil/devil/utils/run_tests_helper.py
new file mode 100644
index 0000000000..7df2da6585
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/run_tests_helper.py
@@ -0,0 +1,44 @@
+# Copyright (c) 2012 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Helper functions common to native, java and host-driven test runners."""
+
+import logging
+import sys
+import time
+
+
+class CustomFormatter(logging.Formatter):
+ """Custom log formatter."""
+
+ # override
+ def __init__(self, fmt='%(threadName)-4s %(message)s'):
+ # Can't use super() because in older Python versions logging.Formatter does
+ # not inherit from object.
+ logging.Formatter.__init__(self, fmt=fmt)
+ self._creation_time = time.time()
+
+ # override
+ def format(self, record):
+ # Can't use super() because in older Python versions logging.Formatter does
+ # not inherit from object.
+ msg = logging.Formatter.format(self, record)
+ if 'MainThread' in msg[:19]:
+ msg = msg.replace('MainThread', 'Main', 1)
+ timediff = time.time() - self._creation_time
+ return '%s %8.3fs %s' % (record.levelname[0], timediff, msg)
+
+
+def SetLogLevel(verbose_count):
+ """Sets log level as |verbose_count|."""
+ log_level = logging.WARNING # Default.
+ if verbose_count == 1:
+ log_level = logging.INFO
+ elif verbose_count >= 2:
+ log_level = logging.DEBUG
+ logger = logging.getLogger()
+ logger.setLevel(log_level)
+ custom_handler = logging.StreamHandler(sys.stdout)
+ custom_handler.setFormatter(CustomFormatter())
+ logging.getLogger().addHandler(custom_handler)
diff --git a/third_party/catapult/devil/devil/utils/signal_handler.py b/third_party/catapult/devil/devil/utils/signal_handler.py
new file mode 100644
index 0000000000..1230f8df5f
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/signal_handler.py
@@ -0,0 +1,48 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import contextlib
+import signal
+
+
+@contextlib.contextmanager
+def SignalHandler(signalnum, handler):
+ """Sets the signal handler for the given signal in the wrapped context.
+
+ Args:
+ signum: The signal for which a handler should be added.
+ additional_handler: The handler to add.
+ """
+ existing_handler = signal.getsignal(signalnum)
+
+ try:
+ signal.signal(signalnum, handler)
+ yield
+ finally:
+ signal.signal(signalnum, existing_handler)
+
+
+@contextlib.contextmanager
+def AddSignalHandler(signalnum, additional_handler):
+ """Adds a signal handler for the given signal in the wrapped context.
+
+ This runs the new handler after any existing handler rather than
+ replacing the existing handler.
+
+ Args:
+ signum: The signal for which a handler should be added.
+ additional_handler: The handler to add.
+ """
+ existing_handler = signal.getsignal(signalnum)
+
+ def handler(signum, frame):
+ if callable(existing_handler):
+ existing_handler(signum, frame)
+ additional_handler(signum, frame)
+
+ try:
+ signal.signal(signalnum, handler)
+ yield
+ finally:
+ signal.signal(signalnum, existing_handler)
diff --git a/third_party/catapult/devil/devil/utils/test/data/test_serial_map.json b/third_party/catapult/devil/devil/utils/test/data/test_serial_map.json
new file mode 100644
index 0000000000..f0682816a0
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/test/data/test_serial_map.json
@@ -0,0 +1 @@
+[{"phone": "Phone1", "battor": "BattOr1"}, {"phone": "Phone2", "battor": "BattOr2"}, {"phone": "Phone3", "battor": "BattOr3"}]
diff --git a/third_party/catapult/devil/devil/utils/timeout_retry.py b/third_party/catapult/devil/devil/utils/timeout_retry.py
new file mode 100644
index 0000000000..d2304629e9
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/timeout_retry.py
@@ -0,0 +1,175 @@
+# Copyright 2013 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""A utility to run functions with timeouts and retries."""
+# pylint: disable=W0702
+
+import logging
+import threading
+import time
+
+from devil.utils import reraiser_thread
+from devil.utils import watchdog_timer
+
+logger = logging.getLogger(__name__)
+
+
+class TimeoutRetryThreadGroup(reraiser_thread.ReraiserThreadGroup):
+
+ def __init__(self, timeout, threads=None):
+ super(TimeoutRetryThreadGroup, self).__init__(threads)
+ self._watcher = watchdog_timer.WatchdogTimer(timeout)
+
+ def GetWatcher(self):
+ """Returns the watchdog keeping track of this thread's time."""
+ return self._watcher
+
+ def GetElapsedTime(self):
+ return self._watcher.GetElapsed()
+
+ def GetRemainingTime(self, required=0, msg=None):
+ """Get the remaining time before the thread times out.
+
+ Useful to send as the |timeout| parameter of async IO operations.
+
+ Args:
+ required: minimum amount of time that will be required to complete, e.g.,
+ some sleep or IO operation.
+ msg: error message to show if timing out.
+
+ Returns:
+ The number of seconds remaining before the thread times out, or None
+ if the thread never times out.
+
+ Raises:
+ reraiser_thread.TimeoutError if the remaining time is less than the
+ required time.
+ """
+ remaining = self._watcher.GetRemaining()
+ if remaining is not None and remaining < required:
+ if msg is None:
+ msg = 'Timeout expired'
+ if remaining > 0:
+ msg += (', wait of %.1f secs required but only %.1f secs left'
+ % (required, remaining))
+ raise reraiser_thread.TimeoutError(msg)
+ return remaining
+
+
+def CurrentTimeoutThreadGroup():
+ """Returns the thread group that owns or is blocked on the active thread.
+
+ Returns:
+ Returns None if no TimeoutRetryThreadGroup is tracking the current thread.
+ """
+ thread_group = reraiser_thread.CurrentThreadGroup()
+ while thread_group:
+ if isinstance(thread_group, TimeoutRetryThreadGroup):
+ return thread_group
+ thread_group = thread_group.blocked_parent_thread_group
+ return None
+
+
+def WaitFor(condition, wait_period=5, max_tries=None):
+ """Wait for a condition to become true.
+
+ Repeatedly call the function condition(), with no arguments, until it returns
+ a true value.
+
+ If called within a TimeoutRetryThreadGroup, it cooperates nicely with it.
+
+ Args:
+ condition: function with the condition to check
+ wait_period: number of seconds to wait before retrying to check the
+ condition
+ max_tries: maximum number of checks to make, the default tries forever
+ or until the TimeoutRetryThreadGroup expires.
+
+ Returns:
+ The true value returned by the condition, or None if the condition was
+ not met after max_tries.
+
+ Raises:
+ reraiser_thread.TimeoutError: if the current thread is a
+ TimeoutRetryThreadGroup and the timeout expires.
+ """
+ condition_name = condition.__name__
+ timeout_thread_group = CurrentTimeoutThreadGroup()
+ while max_tries is None or max_tries > 0:
+ result = condition()
+ if max_tries is not None:
+ max_tries -= 1
+ msg = ['condition', repr(condition_name), 'met' if result else 'not met']
+ if timeout_thread_group:
+ # pylint: disable=no-member
+ msg.append('(%.1fs)' % timeout_thread_group.GetElapsedTime())
+ logger.info(' '.join(msg))
+ if result:
+ return result
+ if timeout_thread_group:
+ # pylint: disable=no-member
+ timeout_thread_group.GetRemainingTime(wait_period,
+ msg='Timed out waiting for %r' % condition_name)
+ time.sleep(wait_period)
+ return None
+
+
+def AlwaysRetry(_exception):
+ return True
+
+
+def Run(func, timeout, retries, args=None, kwargs=None, desc=None,
+ error_log_func=logging.critical, retry_if_func=AlwaysRetry):
+ """Runs the passed function in a separate thread with timeouts and retries.
+
+ Args:
+ func: the function to be wrapped.
+ timeout: the timeout in seconds for each try.
+ retries: the number of retries.
+ args: list of positional args to pass to |func|.
+ kwargs: dictionary of keyword args to pass to |func|.
+ desc: An optional description of |func| used in logging. If omitted,
+ |func.__name__| will be used.
+ error_log_func: Logging function when logging errors.
+ retry_if_func: Unary callable that takes an exception and returns
+ whether |func| should be retried. Defaults to always retrying.
+
+ Returns:
+ The return value of func(*args, **kwargs).
+ """
+ if not args:
+ args = []
+ if not kwargs:
+ kwargs = {}
+ if not desc:
+ desc = func.__name__
+
+ num_try = 1
+ while True:
+ thread_name = 'TimeoutThread-%d-for-%s' % (num_try,
+ threading.current_thread().name)
+ child_thread = reraiser_thread.ReraiserThread(lambda: func(*args, **kwargs),
+ name=thread_name)
+ try:
+ thread_group = TimeoutRetryThreadGroup(timeout, threads=[child_thread])
+ thread_group.StartAll(will_block=True)
+ while True:
+ thread_group.JoinAll(watcher=thread_group.GetWatcher(), timeout=60,
+ error_log_func=error_log_func)
+ if thread_group.IsAlive():
+ logger.info('Still working on %s', desc)
+ else:
+ return thread_group.GetAllReturnValues()[0]
+ except reraiser_thread.TimeoutError as e:
+ # Timeouts already get their stacks logged.
+ if num_try > retries or not retry_if_func(e):
+ raise
+ # Do not catch KeyboardInterrupt.
+ except Exception as e: # pylint: disable=broad-except
+ if num_try > retries or not retry_if_func(e):
+ raise
+ error_log_func(
+ '(%s) Exception on %s, attempt %d of %d: %r',
+ thread_name, desc, num_try, retries + 1, e)
+ num_try += 1
diff --git a/third_party/catapult/devil/devil/utils/timeout_retry_unittest.py b/third_party/catapult/devil/devil/utils/timeout_retry_unittest.py
new file mode 100755
index 0000000000..0eeb31a4f1
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/timeout_retry_unittest.py
@@ -0,0 +1,79 @@
+#!/usr/bin/python
+# Copyright 2013 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Unittests for timeout_and_retry.py."""
+
+import logging
+import time
+import unittest
+
+from devil.utils import reraiser_thread
+from devil.utils import timeout_retry
+
+
+_DEFAULT_TIMEOUT = .1
+
+
+class TestException(Exception):
+ pass
+
+
+def _CountTries(tries):
+ tries[0] += 1
+ raise TestException
+
+
+class TestRun(unittest.TestCase):
+ """Tests for timeout_retry.Run."""
+
+ def testRun(self):
+ self.assertTrue(timeout_retry.Run(
+ lambda x: x, 30, 3, [True], {}))
+
+ def testTimeout(self):
+ tries = [0]
+
+ def _sleep():
+ tries[0] += 1
+ time.sleep(1)
+
+ self.assertRaises(
+ reraiser_thread.TimeoutError, timeout_retry.Run, _sleep, .01, 1,
+ error_log_func=logging.debug)
+ self.assertEqual(tries[0], 2)
+
+ def testRetries(self):
+ tries = [0]
+ self.assertRaises(
+ TestException, timeout_retry.Run, lambda: _CountTries(tries),
+ _DEFAULT_TIMEOUT, 3, error_log_func=logging.debug)
+ self.assertEqual(tries[0], 4)
+
+ def testNoRetries(self):
+ tries = [0]
+ self.assertRaises(
+ TestException, timeout_retry.Run, lambda: _CountTries(tries),
+ _DEFAULT_TIMEOUT, 0, error_log_func=logging.debug)
+ self.assertEqual(tries[0], 1)
+
+ def testReturnValue(self):
+ self.assertTrue(timeout_retry.Run(lambda: True, _DEFAULT_TIMEOUT, 3))
+
+ def testCurrentTimeoutThreadGroup(self):
+ def InnerFunc():
+ current_thread_group = timeout_retry.CurrentTimeoutThreadGroup()
+ self.assertIsNotNone(current_thread_group)
+
+ def InnerInnerFunc():
+ self.assertEqual(current_thread_group,
+ timeout_retry.CurrentTimeoutThreadGroup())
+ return True
+ return reraiser_thread.RunAsync((InnerInnerFunc,))[0]
+
+ self.assertTrue(timeout_retry.Run(InnerFunc, _DEFAULT_TIMEOUT, 3))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/third_party/catapult/devil/devil/utils/update_mapping.py b/third_party/catapult/devil/devil/utils/update_mapping.py
new file mode 100755
index 0000000000..6666b9b084
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/update_mapping.py
@@ -0,0 +1,47 @@
+#!/usr/bin/env python
+
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import argparse
+import sys
+
+from devil.utils import battor_device_mapping
+
+def parse_options():
+ """Parses and checks the command-line options.
+
+ Returns:
+ A tuple containing the options structure.
+ """
+ usage = 'Usage: ./update_mapping.py [options]'
+ desc = ('Example: ./update_mapping.py -o mapping.json.\n'
+ 'This script generates and stores a file that gives the\n'
+ 'mapping between phone serial numbers and BattOr serial numbers\n'
+ 'Mapping is based on which physical ports on the USB hubs the\n'
+ 'devices are plugged in to. For instance, if there are two hubs,\n'
+ 'the phone connected to port N on the first hub is mapped to the\n'
+ 'BattOr connected to port N on the second hub, for each N.')
+ parser = argparse.ArgumentParser(usage=usage, description=desc)
+ parser.add_argument('-o', '--output', dest='out_file',
+ default='mapping.json', type=str,
+ action='store', help='mapping file name')
+ parser.add_argument('-u', '--hub', dest='hub_types',
+ action='append', choices=['plugable_7port',
+ 'plugable_7port_usb3_part2',
+ 'plugable_7port_usb3_part3'],
+ help='USB hub types.')
+ options = parser.parse_args()
+ if not options.hub_types:
+ options.hub_types = ['plugable_7port', 'plugable_7port_usb3_part2',
+ 'plugable_7port_usb3_part3']
+ return options
+
+def main():
+ options = parse_options()
+ battor_device_mapping.GenerateSerialMapFile(options.out_file,
+ options.hub_types)
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/third_party/catapult/devil/devil/utils/usb_hubs.py b/third_party/catapult/devil/devil/utils/usb_hubs.py
new file mode 100644
index 0000000000..1b9566a356
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/usb_hubs.py
@@ -0,0 +1,165 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+PLUGABLE_7PORT_LAYOUT = {1:7,
+ 2:6,
+ 3:5,
+ 4:{1:4, 2:3, 3:2, 4:1}}
+
+PLUGABLE_7PORT_USB3_LAYOUT = {1:{1:1, 2:2, 3:3, 4:4},
+ 2:5,
+ 3:6,
+ 4:7}
+
+KEEDOX_LAYOUT = {1:1,
+ 2:2,
+ 3:3,
+ 4:{1:4, 2:5, 3:6, 4:7}}
+
+class HubType(object):
+ def __init__(self, id_func, port_mapping):
+ """Defines a type of hub.
+
+ Args:
+ id_func: [USBNode -> bool] is a function that can be run on a node
+ to determine if the node represents this type of hub.
+ port_mapping: [dict(int:(int|dict))] maps virtual to physical port
+ numbers. For instance, {3:1, 1:2, 2:3} means that virtual port 3
+ corresponds to physical port 1, virtual port 1 corresponds to physical
+ port 2, and virtual port 2 corresponds to physical port 3. In the
+ case of hubs with "internal" topology, this is represented by nested
+ maps. For instance, {1:{1:1,2:2},2:{1:3,2:4}} means, e.g. that the
+ device plugged into physical port 3 will show up as being connected
+ to port 1, on a device which is connected to port 2 on the hub.
+ """
+ self._id_func = id_func
+ # v2p = "virtual to physical" ports
+ self._v2p_port = port_mapping
+
+ def IsType(self, node):
+ """Determines if the given Node is a hub of this type.
+
+ Args:
+ node: [USBNode] Node to check.
+ """
+ return self._id_func(node)
+
+ def GetPhysicalPortToNodeTuples(self, node):
+ """Gets devices connected to the physical ports on a hub of this type.
+
+ Args:
+ node: [USBNode] Node representing a hub of this type.
+
+ Yields:
+ A series of (int, USBNode) tuples giving a physical port
+ and the USBNode connected to it.
+
+ Raises:
+ ValueError: If the given node isn't a hub of this type.
+ """
+ if self.IsType(node):
+ for res in self._GppHelper(node, self._v2p_port):
+ yield res
+ else:
+ raise ValueError('Node must be a hub of this type')
+
+ def _GppHelper(self, node, mapping):
+ """Helper function for GetPhysicalPortToNodeMap.
+
+ Gets devices connected to physical ports, based on device tree
+ rooted at the given node and the mapping between virtual and physical
+ ports.
+
+ Args:
+ node: [USBNode] Root of tree to search for devices.
+ mapping: [dict] Mapping between virtual and physical ports.
+
+ Yields:
+ A series of (int, USBNode) tuples giving a physical port
+ and the Node connected to it.
+ """
+ for (virtual, physical) in mapping.iteritems():
+ if node.HasPort(virtual):
+ if isinstance(physical, dict):
+ for res in self._GppHelper(node.PortToDevice(virtual), physical):
+ yield res
+ else:
+ yield (physical, node.PortToDevice(virtual))
+
+def _is_plugable_7port_hub(node):
+ """Check if a node is a Plugable 7-Port Hub
+ (Model USB2-HUB7BC)
+ The topology of this device is a 4-port hub,
+ with another 4-port hub connected on port 4.
+ """
+ if '1a40:0101' not in node.desc:
+ return False
+ if not node.HasPort(4):
+ return False
+ return '1a40:0101' in node.PortToDevice(4).desc
+
+# Plugable 7-Port USB-3 Hubs show up twice in the USB devices list; they have
+# two different "branches", one which has USB2 devices and one which has
+# USB3 devices. The "part2" is the "USB-2" branch of the hub, the
+# "part3" is the "USB-3" branch of the hub.
+
+def _is_plugable_7port_usb3_part2_hub(node):
+ """Check if a node is the "USB2 branch" of
+ a Plugable 7-Port USB-3 Hub (Model USB3-HUB7BC)
+ The topology of this device is a 4-port hub,
+ with another 4-port hub connected on port 1.
+ """
+ if '2109:2811' not in node.desc:
+ return False
+ if not node.HasPort(1):
+ return False
+ return '2109:2811' in node.PortToDevice(1).desc
+
+def _is_plugable_7port_usb3_part3_hub(node):
+ """Check if a node is the "USB3 branch" of
+ a Plugable 7-Port USB-3 Hub (Model USB3-HUB7BC)
+ The topology of this device is a 4-port hub,
+ with another 4-port hub connected on port 1.
+ """
+ if '2109:8110' not in node.desc:
+ return False
+ if not node.HasPort(1):
+ return False
+ return '2109:8110' in node.PortToDevice(1).desc
+
+def _is_keedox_hub(node):
+ """Check if a node is a Keedox hub.
+ The topology of this device is a 4-port hub,
+ with another 4-port hub connected on port 4.
+ """
+ if '0bda:5411' not in node.desc:
+ return False
+ if not node.HasPort(4):
+ return False
+ return '0bda:5411' in node.PortToDevice(4).desc
+
+
+PLUGABLE_7PORT = HubType(_is_plugable_7port_hub, PLUGABLE_7PORT_LAYOUT)
+PLUGABLE_7PORT_USB3_PART2 = HubType(_is_plugable_7port_usb3_part2_hub,
+ PLUGABLE_7PORT_USB3_LAYOUT)
+PLUGABLE_7PORT_USB3_PART3 = HubType(_is_plugable_7port_usb3_part3_hub,
+ PLUGABLE_7PORT_USB3_LAYOUT)
+KEEDOX = HubType(_is_keedox_hub, KEEDOX_LAYOUT)
+
+ALL_HUBS = [PLUGABLE_7PORT,
+ PLUGABLE_7PORT_USB3_PART2,
+ PLUGABLE_7PORT_USB3_PART3,
+ KEEDOX]
+
+def GetHubType(type_name):
+ if type_name == 'plugable_7port':
+ return PLUGABLE_7PORT
+ if type_name == 'plugable_7port_usb3_part2':
+ return PLUGABLE_7PORT_USB3_PART2
+ if type_name == 'plugable_7port_usb3_part3':
+ return PLUGABLE_7PORT_USB3_PART3
+ if type_name == 'keedox':
+ return KEEDOX
+ else:
+ raise ValueError('Invalid hub type')
diff --git a/third_party/catapult/devil/devil/utils/watchdog_timer.py b/third_party/catapult/devil/devil/utils/watchdog_timer.py
new file mode 100644
index 0000000000..2f4c46455b
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/watchdog_timer.py
@@ -0,0 +1,47 @@
+# Copyright 2013 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""WatchdogTimer timeout objects."""
+
+import time
+
+
+class WatchdogTimer(object):
+ """A resetable timeout-based watchdog.
+
+ This object is threadsafe.
+ """
+
+ def __init__(self, timeout):
+ """Initializes the watchdog.
+
+ Args:
+ timeout: The timeout in seconds. If timeout is None it will never timeout.
+ """
+ self._start_time = time.time()
+ self._timeout = timeout
+
+ def Reset(self):
+ """Resets the timeout countdown."""
+ self._start_time = time.time()
+
+ def GetElapsed(self):
+ """Returns the elapsed time of the watchdog."""
+ return time.time() - self._start_time
+
+ def GetRemaining(self):
+ """Returns the remaining time of the watchdog."""
+ if self._timeout:
+ return self._timeout - self.GetElapsed()
+ else:
+ return None
+
+ def IsTimedOut(self):
+ """Whether the watchdog has timed out.
+
+ Returns:
+ True if the watchdog has timed out, False otherwise.
+ """
+ remaining = self.GetRemaining()
+ return remaining is not None and remaining < 0
diff --git a/third_party/catapult/devil/devil/utils/zip_utils.py b/third_party/catapult/devil/devil/utils/zip_utils.py
new file mode 100644
index 0000000000..eaa6a2df01
--- /dev/null
+++ b/third_party/catapult/devil/devil/utils/zip_utils.py
@@ -0,0 +1,33 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import logging
+import os
+import zipfile
+
+logger = logging.getLogger(__name__)
+
+
+def WriteToZipFile(zip_file, path, arc_path):
+ """Recursively write |path| to |zip_file| as |arc_path|.
+
+ zip_file: An open instance of zipfile.ZipFile.
+ path: An absolute path to the file or directory to be zipped.
+ arc_path: A relative path within the zip file to which the file or directory
+ located at |path| should be written.
+ """
+ if os.path.isdir(path):
+ for dir_path, _, file_names in os.walk(path):
+ dir_arc_path = os.path.join(arc_path, os.path.relpath(dir_path, path))
+ logger.debug('dir: %s -> %s', dir_path, dir_arc_path)
+ zip_file.write(dir_path, dir_arc_path, zipfile.ZIP_STORED)
+ for f in file_names:
+ file_path = os.path.join(dir_path, f)
+ file_arc_path = os.path.join(dir_arc_path, f)
+ logger.debug('file: %s -> %s', file_path, file_arc_path)
+ zip_file.write(file_path, file_arc_path, zipfile.ZIP_DEFLATED)
+ else:
+ logger.debug('file: %s -> %s', path, arc_path)
+ zip_file.write(path, arc_path, zipfile.ZIP_DEFLATED)
+