diff options
author | David Pursell <dpursell@google.com> | 2015-10-19 19:46:35 +0000 |
---|---|---|
committer | Android (Google) Code Review <android-gerrit@google.com> | 2015-10-19 19:46:35 +0000 |
commit | 89c81bcc278c2f9e4cd5a52e39dc817be9b493d3 (patch) | |
tree | 5d86832357a2db120e26240ddbe632a63e5d09c1 | |
parent | 83992d7ce5e0ce9de26b2f4449652aa105613af9 (diff) | |
parent | 1d2fccc0328f742f88d7f333cb7007ad1205e0f9 (diff) | |
download | bdk-89c81bcc278c2f9e4cd5a52e39dc817be9b493d3.tar.gz |
Merge "adb: add python tests." into mnc-brillo-dev
-rw-r--r-- | debugging/adb/device.py | 96 | ||||
-rw-r--r-- | debugging/adb/test_brillo_device.py | 101 | ||||
-rw-r--r-- | debugging/adb/test_device.py | 595 | ||||
-rwxr-xr-x | debugging/brillo_adb_test.py | 248 |
4 files changed, 1013 insertions, 27 deletions
diff --git a/debugging/adb/device.py b/debugging/adb/device.py index 392fafe..e040cdf 100644 --- a/debugging/adb/device.py +++ b/debugging/adb/device.py @@ -14,6 +14,7 @@ # limitations under the License. # import atexit +import contextlib import logging import os import re @@ -151,36 +152,77 @@ def get_emulator_device(): return _get_device_by_type('-e') +@contextlib.contextmanager +def _file_deleter(f): + yield + if f: + f.close() + os.remove(f.name) + + +# Internal helper that may return a temporary file (containing a command line +# in UTF-8) that should be executed with the help of _get_subprocess_args(). +def _get_windows_unicode_helper(args): + # Only do this slow work-around if Unicode is in the cmd line on Windows. + if (os.name != 'nt' or all(not isinstance(arg, unicode) for arg in args)): + return None + + # cmd.exe requires a suffix to know that it is running a batch file. + # We can't use delete=True because that causes File Share Mode Delete to be + # used which prevents the file from being opened by other processes that + # don't use that File Share Mode. The caller must manually delete the file. + tf = tempfile.NamedTemporaryFile('wb', suffix='.cmd', delete=False) + # @ in batch suppresses echo of the current line. + # Change the codepage to 65001, the UTF-8 codepage. + tf.write('@chcp 65001 > nul\r\n') + tf.write('@') + # Properly quote all the arguments and encode in UTF-8. + tf.write(subprocess.list2cmdline(args).encode('utf-8')) + tf.close() + return tf + + +# Let the caller know how to run the batch file. Takes subprocess.check_output() +# or subprocess.Popen() args and returns a new tuple that should be passed +# instead, or the original args if there is no file +def _get_subprocess_args(args, helper_file): + if helper_file: + # Concatenate our new command line args with any other function args. + return (['cmd.exe', '/c', helper_file.name],) + args[1:] + else: + return args + + # Call this instead of subprocess.check_output() to work-around issue in Python # 2's subprocess class on Windows where it doesn't support Unicode. This # writes the command line to a UTF-8 batch file that is properly interpreted # by cmd.exe. -def _subprocess_check_output(*popenargs, **kwargs): - # Only do this slow work-around if Unicode is in the cmd line. - if (os.name == 'nt' and - any(isinstance(arg, unicode) for arg in popenargs[0])): - # cmd.exe requires a suffix to know that it is running a batch file - tf = tempfile.NamedTemporaryFile('wb', suffix='.cmd', delete=False) - # @ in batch suppresses echo of the current line. - # Change the codepage to 65001, the UTF-8 codepage. - tf.write('@chcp 65001 > nul\r\n') - tf.write('@') - # Properly quote all the arguments and encode in UTF-8. - tf.write(subprocess.list2cmdline(popenargs[0]).encode('utf-8')) - tf.close() - +def _subprocess_check_output(*args, **kwargs): + helper = _get_windows_unicode_helper(args[0]) + with _file_deleter(helper): try: - result = subprocess.check_output(['cmd.exe', '/c', tf.name], - **kwargs) + return subprocess.check_output( + *_get_subprocess_args(args, helper), **kwargs) except subprocess.CalledProcessError as e: # Show real command line instead of the cmd.exe command line. - raise subprocess.CalledProcessError(e.returncode, popenargs[0], + raise subprocess.CalledProcessError(e.returncode, args[0], output=e.output) - finally: - os.remove(tf.name) - return result - else: - return subprocess.check_output(*popenargs, **kwargs) + + +# Call this instead of subprocess.Popen(). Like _subprocess_check_output(). +class _subprocess_Popen(subprocess.Popen): + def __init__(self, *args, **kwargs): + # Save reference to helper so that it can be deleted once it is no + # longer used. + self.helper = _get_windows_unicode_helper(args[0]) + super(_subprocess_Popen, self).__init__( + *_get_subprocess_args(args, self.helper), **kwargs) + + def __del__(self, *args, **kwargs): + super(_subprocess_Popen, self).__del__(*args, **kwargs) + if self.helper: + os.remove(self.helper.name) + class AndroidDevice(object): # Delimiter string to indicate the start of the exit code. @@ -270,7 +312,7 @@ class AndroidDevice(object): """Calls `adb shell` Args: - cmd: string shell command to execute. + cmd: command to execute as a list of strings. Returns: A (stdout, stderr) tuple. Stderr may be combined into stdout @@ -288,7 +330,7 @@ class AndroidDevice(object): """Calls `adb shell` Args: - cmd: string shell command to execute. + cmd: command to execute as a list of strings. Returns: An (exit_code, stdout, stderr) tuple. Stderr may be combined @@ -296,7 +338,7 @@ class AndroidDevice(object): """ cmd = self._make_shell_cmd(cmd) logging.info(' '.join(cmd)) - p = subprocess.Popen( + p = _subprocess_Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() if self.SHELL_PROTOCOL_FEATURE in self.features: @@ -339,8 +381,8 @@ class AndroidDevice(object): os.setpgrp() preexec_fn = _wrapper - p = subprocess.Popen(command, creationflags=creationflags, - preexec_fn=preexec_fn, **kwargs) + p = _subprocess_Popen(command, creationflags=creationflags, + preexec_fn=preexec_fn, **kwargs) if kill_atexit: atexit.register(p.kill) diff --git a/debugging/adb/test_brillo_device.py b/debugging/adb/test_brillo_device.py new file mode 100644 index 0000000..5ecd5a0 --- /dev/null +++ b/debugging/adb/test_brillo_device.py @@ -0,0 +1,101 @@ +# Copyright (C) 2015 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Brillo device `adb` tests. + +This module contains some tests exercising `adb` functionality for +Brillo devices. This pulls in some tests from the generic test_device +module, but omits tests which require framework-level functionality not +available on Brillo Tier 1 devices. We also add some additional tests +such as large file transfers and some common operations. + +Requirements: + * Python "unittest" module must be installed. + * Exactly 1 device must be connected, or the ANDROID_SERIAL + environment variable must be set to select the desired device. +""" + +import os +import subprocess +import tempfile +import unittest + +from adb import test_device + + +def create_data_file(size): + """Creates a data file of |size| bytes. + + Args: + size: desired file size in bytes. + + Returns: + New file name. Caller is responsible for deleting the file. + """ + temp = tempfile.NamedTemporaryFile('wb', delete=False) + if size > 0: + data_str = ''.join(chr(i) for i in range(256)) * 1000 + while len(data_str) < size: + temp.write(data_str) + size -= len(data_str) + temp.write(data_str[:size]) + temp.close() + return temp.name + + +class ShellTest(test_device.ShellTest): + """Basic shell tests imported from test_device.""" + + +class FileTest(test_device.FileOperationsTest): + """Small file push/pull tests imported from test_device.""" + + def test_large_file(self): + """Tests pushing and pulling a larger file.""" + # Use a 10MB file to test larger push/pull operations. + f1_name = create_data_file(10 * 1000 * 1000) + self.device.push(local=f1_name, remote=self.DEVICE_TEMP_FILE) + + f2_name = create_data_file(0) + self.device.pull(remote=self.DEVICE_TEMP_FILE, local=f2_name) + + if os.name == 'nt': + file_comp_program = 'fc' + else: + file_comp_program = 'cmp' + subprocess.check_output([file_comp_program, f1_name, f2_name]) + + os.remove(f1_name) + os.remove(f2_name) + self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) + + +class BasicUsageTest(test_device.DeviceTest): + """Tests some common operations users might perform.""" + + def test_logcat(self): + """Check that logcat gives some output.""" + stdout = subprocess.check_output(self.device.adb_cmd + ['logcat', '-d']) + self.assertTrue(len(stdout) > 0) + + def test_shell_dmesg(self): + """Check that `adb shell dmesg` gives some output.""" + stdout, stderr = self.device.shell(['dmesg']) + self.assertTrue(len(stdout) > 0) + self.assertTrue(stderr == "") + + +def suite(): + """Returns the full test suite for this module.""" + return unittest.TestLoader().loadTestsFromName(__name__) diff --git a/debugging/adb/test_device.py b/debugging/adb/test_device.py new file mode 100644 index 0000000..c4f7f36 --- /dev/null +++ b/debugging/adb/test_device.py @@ -0,0 +1,595 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (C) 2015 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import print_function + +import hashlib +import os +import posixpath +import random +import shlex +import shutil +import signal +import string +import subprocess +import tempfile +import unittest + +import mock + +import adb + + +def requires_root(func): + def wrapper(self, *args): + if self.device.get_prop('ro.debuggable') != '1': + raise unittest.SkipTest('requires rootable build') + + was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' + if not was_root: + self.device.root() + self.device.wait() + + try: + func(self, *args) + finally: + if not was_root: + self.device.unroot() + self.device.wait() + + return wrapper + + +class GetDeviceTest(unittest.TestCase): + def setUp(self): + self.android_serial = os.getenv('ANDROID_SERIAL') + if 'ANDROID_SERIAL' in os.environ: + del os.environ['ANDROID_SERIAL'] + + def tearDown(self): + if self.android_serial is not None: + os.environ['ANDROID_SERIAL'] = self.android_serial + else: + if 'ANDROID_SERIAL' in os.environ: + del os.environ['ANDROID_SERIAL'] + + @mock.patch('adb.device.get_devices') + def test_explicit(self, mock_get_devices): + mock_get_devices.return_value = ['foo', 'bar'] + device = adb.get_device('foo') + self.assertEqual(device.serial, 'foo') + + @mock.patch('adb.device.get_devices') + def test_from_env(self, mock_get_devices): + mock_get_devices.return_value = ['foo', 'bar'] + os.environ['ANDROID_SERIAL'] = 'foo' + device = adb.get_device() + self.assertEqual(device.serial, 'foo') + + @mock.patch('adb.device.get_devices') + def test_arg_beats_env(self, mock_get_devices): + mock_get_devices.return_value = ['foo', 'bar'] + os.environ['ANDROID_SERIAL'] = 'bar' + device = adb.get_device('foo') + self.assertEqual(device.serial, 'foo') + + @mock.patch('adb.device.get_devices') + def test_no_such_device(self, mock_get_devices): + mock_get_devices.return_value = ['foo', 'bar'] + self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz']) + + os.environ['ANDROID_SERIAL'] = 'baz' + self.assertRaises(adb.DeviceNotFoundError, adb.get_device) + + @mock.patch('adb.device.get_devices') + def test_unique_device(self, mock_get_devices): + mock_get_devices.return_value = ['foo'] + device = adb.get_device() + self.assertEqual(device.serial, 'foo') + + @mock.patch('adb.device.get_devices') + def test_no_unique_device(self, mock_get_devices): + mock_get_devices.return_value = ['foo', 'bar'] + self.assertRaises(adb.NoUniqueDeviceError, adb.get_device) + + +class DeviceTest(unittest.TestCase): + def setUp(self): + self.device = adb.get_device() + + +class ShellTest(DeviceTest): + def _interactive_shell(self, shell_args, input): + """Runs an interactive adb shell. + + Args: + shell_args: List of string arguments to `adb shell`. + input: String input to send to the interactive shell. + + Returns: + The remote exit code. + + Raises: + unittest.SkipTest: The device doesn't support exit codes. + """ + if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features: + raise unittest.SkipTest('exit codes are unavailable on this device') + + proc = subprocess.Popen( + self.device.adb_cmd + ['shell'] + shell_args, + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + # Closing host-side stdin doesn't currently trigger the interactive + # shell to exit so we need to explicitly add an exit command to + # close the session from the device side, and append newline to complete + # the interactive command. + proc.communicate(input + '; exit\n') + return proc.returncode + + def test_cat(self): + """Check that we can at least cat a file.""" + out = self.device.shell(['cat', '/proc/uptime'])[0].strip() + elements = out.split() + self.assertEqual(len(elements), 2) + + uptime, idle = elements + self.assertGreater(float(uptime), 0.0) + self.assertGreater(float(idle), 0.0) + + def test_throws_on_failure(self): + self.assertRaises(adb.ShellError, self.device.shell, ['false']) + + def test_output_not_stripped(self): + out = self.device.shell(['echo', 'foo'])[0] + self.assertEqual(out, 'foo' + self.device.linesep) + + def test_shell_nocheck_failure(self): + rc, out, _ = self.device.shell_nocheck(['false']) + self.assertNotEqual(rc, 0) + self.assertEqual(out, '') + + def test_shell_nocheck_output_not_stripped(self): + rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) + self.assertEqual(rc, 0) + self.assertEqual(out, 'foo' + self.device.linesep) + + def test_can_distinguish_tricky_results(self): + # If result checking on ADB shell is naively implemented as + # `adb shell <cmd>; echo $?`, we would be unable to distinguish the + # output from the result for a cmd of `echo -n 1`. + rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) + self.assertEqual(rc, 0) + self.assertEqual(out, '1') + + def test_line_endings(self): + """Ensure that line ending translation is not happening in the pty. + + Bug: http://b/19735063 + """ + output = self.device.shell(['uname'])[0] + self.assertEqual(output, 'Linux' + self.device.linesep) + + def test_default_pty_logic(self): + """Verify default PTY logic for shells. + + Interactive shells should use a PTY, non-interactive should not. + + Bug: http://b/21215503 + """ + # [ -t 0 ] is used (rather than `tty`) to provide portability. This + # gives an exit code of 0 iff stdin is connected to a terminal. + self.assertEqual(0, self._interactive_shell([], '[ -t 0 ]')) + self.assertEqual(1, self.device.shell_nocheck(['[ -t 0 ]'])[0]) + + def test_pty_arguments(self): + """Tests the -T and -t arguments to manually control PTY.""" + if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features: + raise unittest.SkipTest('PTY arguments unsupported on this device') + + self.assertEqual(0, self._interactive_shell(['-t'], '[ -t 0 ]')) + self.assertEqual(1, self._interactive_shell(['-T'], '[ -t 0 ]')) + self.assertEqual(0, self.device.shell_nocheck(['-t', '[ -t 0 ]'])[0]) + self.assertEqual(1, self.device.shell_nocheck(['-T', '[ -t 0 ]'])[0]) + + def test_shell_protocol(self): + """Tests the shell protocol on the device. + + If the device supports shell protocol, this gives us the ability + to separate stdout/stderr and return the exit code directly. + + Bug: http://b/19734861 + """ + if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features: + raise unittest.SkipTest('shell protocol unsupported on this device') + + # Shell protocol should be used by default. + result = self.device.shell_nocheck( + shlex.split('echo foo; echo bar >&2; exit 17')) + self.assertEqual(17, result[0]) + self.assertEqual('foo' + self.device.linesep, result[1]) + self.assertEqual('bar' + self.device.linesep, result[2]) + + self.assertEqual(17, self._interactive_shell([], 'exit 17')) + + # -x flag should disable shell protocol. + result = self.device.shell_nocheck( + shlex.split('-x echo foo; echo bar >&2; exit 17')) + self.assertEqual(0, result[0]) + self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1]) + self.assertEqual('', result[2]) + + self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17')) + + def test_non_interactive_sigint(self): + """Tests that SIGINT in a non-interactive shell kills the process. + + This requires the shell protocol in order to detect the broken + pipe; raw data transfer mode will only see the break once the + subprocess tries to read or write. + + Bug: http://b/23825725 + """ + if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features: + raise unittest.SkipTest('shell protocol unsupported on this device') + + # Start a long-running process. + sleep_proc = subprocess.Popen( + self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + remote_pid = sleep_proc.stdout.readline().strip() + self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') + proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) + + # Verify that the process is running, send signal, verify it stopped. + self.device.shell(proc_query) + os.kill(sleep_proc.pid, signal.SIGINT) + sleep_proc.communicate() + self.assertEqual(1, self.device.shell_nocheck(proc_query)[0], + 'subprocess failed to terminate') + + def test_non_interactive_stdin(self): + """Tests that non-interactive shells send stdin.""" + if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features: + raise unittest.SkipTest('non-interactive stdin unsupported ' + 'on this device') + + # Test both small and large inputs. + small_input = 'foo' + large_input = '\n'.join(c * 100 for c in (string.ascii_letters + + string.digits)) + + for input in (small_input, large_input): + proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = proc.communicate(input) + self.assertEqual(input.splitlines(), stdout.splitlines()) + self.assertEqual('', stderr) + + +class ArgumentEscapingTest(DeviceTest): + def test_shell_escaping(self): + """Make sure that argument escaping is somewhat sane.""" + + # http://b/19734868 + # Note that this actually matches ssh(1)'s behavior --- it's + # converted to `sh -c echo hello; echo world` which sh interprets + # as `sh -c echo` (with an argument to that shell of "hello"), + # and then `echo world` back in the first shell. + result = self.device.shell( + shlex.split("sh -c 'echo hello; echo world'"))[0] + result = result.splitlines() + self.assertEqual(['', 'world'], result) + # If you really wanted "hello" and "world", here's what you'd do: + result = self.device.shell( + shlex.split(r'echo hello\;echo world'))[0].splitlines() + self.assertEqual(['hello', 'world'], result) + + # http://b/15479704 + result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() + self.assertEqual('t', result) + result = self.device.shell( + shlex.split("sh -c 'true && echo t'"))[0].strip() + self.assertEqual('t', result) + + # http://b/20564385 + result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() + self.assertEqual('t', result) + result = self.device.shell( + shlex.split(r'echo -n 123\;uname'))[0].strip() + self.assertEqual('123Linux', result) + + def test_install_argument_escaping(self): + """Make sure that install argument escaping works.""" + # http://b/20323053, http://b/3090932. + for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"): + tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix, + delete=False) + tf.close() + + # Installing bogus .apks fails if the device supports exit codes. + try: + output = self.device.install(tf.name) + except subprocess.CalledProcessError as e: + output = e.output + + self.assertIn(file_suffix, output) + os.remove(tf.name) + + +class RootUnrootTest(DeviceTest): + def _test_root(self): + message = self.device.root() + if 'adbd cannot run as root in production builds' in message: + return + self.device.wait() + self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) + + def _test_unroot(self): + self.device.unroot() + self.device.wait() + self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) + + def test_root_unroot(self): + """Make sure that adb root and adb unroot work, using id(1).""" + if self.device.get_prop('ro.debuggable') != '1': + raise unittest.SkipTest('requires rootable build') + + original_user = self.device.shell(['id', '-un'])[0].strip() + try: + if original_user == 'root': + self._test_unroot() + self._test_root() + elif original_user == 'shell': + self._test_root() + self._test_unroot() + finally: + if original_user == 'root': + self.device.root() + else: + self.device.unroot() + self.device.wait() + + +class TcpIpTest(DeviceTest): + def test_tcpip_failure_raises(self): + """adb tcpip requires a port. + + Bug: http://b/22636927 + """ + self.assertRaises( + subprocess.CalledProcessError, self.device.tcpip, '') + self.assertRaises( + subprocess.CalledProcessError, self.device.tcpip, 'foo') + + +class SystemPropertiesTest(DeviceTest): + def test_get_prop(self): + self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') + + @requires_root + def test_set_prop(self): + prop_name = 'foo.bar' + self.device.shell(['setprop', prop_name, '""']) + + self.device.set_prop(prop_name, 'qux') + self.assertEqual( + self.device.shell(['getprop', prop_name])[0].strip(), 'qux') + + +def compute_md5(string): + hsh = hashlib.md5() + hsh.update(string) + return hsh.hexdigest() + + +def get_md5_prog(device): + """Older platforms (pre-L) had the name md5 rather than md5sum.""" + try: + device.shell(['md5sum', '/proc/uptime']) + return 'md5sum' + except subprocess.CalledProcessError: + return 'md5' + + +class HostFile(object): + def __init__(self, handle, checksum): + self.handle = handle + self.checksum = checksum + self.full_path = handle.name + self.base_name = os.path.basename(self.full_path) + + +class DeviceFile(object): + def __init__(self, checksum, full_path): + self.checksum = checksum + self.full_path = full_path + self.base_name = posixpath.basename(self.full_path) + + +def make_random_host_files(in_dir, num_files): + min_size = 1 * (1 << 10) + max_size = 16 * (1 << 10) + + files = [] + for _ in xrange(num_files): + file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) + + size = random.randrange(min_size, max_size, 1024) + rand_str = os.urandom(size) + file_handle.write(rand_str) + file_handle.flush() + file_handle.close() + + md5 = compute_md5(rand_str) + files.append(HostFile(file_handle, md5)) + return files + + +def make_random_device_files(device, in_dir, num_files): + min_size = 1 * (1 << 10) + max_size = 16 * (1 << 10) + + files = [] + for file_num in xrange(num_files): + size = random.randrange(min_size, max_size, 1024) + + base_name = 'device_tmpfile' + str(file_num) + full_path = posixpath.join(in_dir, base_name) + + device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), + 'bs={}'.format(size), 'count=1']) + dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split() + + files.append(DeviceFile(dev_md5, full_path)) + return files + + +class FileOperationsTest(DeviceTest): + SCRATCH_DIR = '/data/local/tmp' + DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' + DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' + + def _test_push(self, local_file, checksum): + self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) + self.device.push(local=local_file, remote=self.DEVICE_TEMP_FILE) + dev_md5, _ = self.device.shell([get_md5_prog(self.device), + self.DEVICE_TEMP_FILE])[0].split() + self.assertEqual(checksum, dev_md5) + self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) + + def test_push(self): + """Push a randomly generated file to specified device.""" + kbytes = 512 + tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) + rand_str = os.urandom(1024 * kbytes) + tmp.write(rand_str) + tmp.close() + self._test_push(tmp.name, compute_md5(rand_str)) + os.remove(tmp.name) + + # TODO: write push directory test. + + def _test_pull(self, remote_file, checksum): + tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) + tmp_write.close() + self.device.pull(remote=remote_file, local=tmp_write.name) + with open(tmp_write.name, 'rb') as tmp_read: + host_contents = tmp_read.read() + host_md5 = compute_md5(host_contents) + self.assertEqual(checksum, host_md5) + os.remove(tmp_write.name) + + def test_pull(self): + """Pull a randomly generated file from specified device.""" + kbytes = 512 + self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) + cmd = ['dd', 'if=/dev/urandom', + 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', + 'count={}'.format(kbytes)] + self.device.shell(cmd) + dev_md5, _ = self.device.shell( + [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split() + self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) + self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) + + def test_pull_dir(self): + """Pull a randomly generated directory of files from the device.""" + host_dir = tempfile.mkdtemp() + self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) + self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) + + # Populate device directory with random files. + temp_files = make_random_device_files( + self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) + + self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) + + for temp_file in temp_files: + host_path = os.path.join(host_dir, temp_file.base_name) + with open(host_path, 'rb') as host_file: + host_md5 = compute_md5(host_file.read()) + self.assertEqual(host_md5, temp_file.checksum) + + self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) + if host_dir is not None: + shutil.rmtree(host_dir) + + def test_sync(self): + """Sync a randomly generated directory of files to specified device.""" + base_dir = tempfile.mkdtemp() + + # Create mirror device directory hierarchy within base_dir. + full_dir_path = base_dir + self.DEVICE_TEMP_DIR + os.makedirs(full_dir_path) + + # Create 32 random files within the host mirror. + temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32) + + # Clean up any trash on the device. + device = adb.get_device(product=base_dir) + device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) + + device.sync('data') + + # Confirm that every file on the device mirrors that on the host. + for temp_file in temp_files: + device_full_path = posixpath.join(self.DEVICE_TEMP_DIR, + temp_file.base_name) + dev_md5, _ = device.shell( + [get_md5_prog(self.device), device_full_path])[0].split() + self.assertEqual(temp_file.checksum, dev_md5) + + self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) + if base_dir is not None: + shutil.rmtree(base_dir) + + def test_unicode_paths(self): + """Ensure that we can support non-ASCII paths, even on Windows.""" + name = u'로보카 폴리' + + ## push. + tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) + tf.close() + self.device.push(tf.name, u'/data/local/tmp/adb-test-{}'.format(name)) + os.remove(tf.name) + self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) + + # pull. + cmd = ['touch', u'"/data/local/tmp/adb-test-{}"'.format(name)] + self.device.shell(cmd) + + tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) + tf.close() + self.device.pull(u'/data/local/tmp/adb-test-{}'.format(name), tf.name) + os.remove(tf.name) + self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) + + +def main(): + random.seed(0) + if len(adb.get_devices()) > 0: + suite = unittest.TestLoader().loadTestsFromName(__name__) + unittest.TextTestRunner(verbosity=3).run(suite) + else: + print('Test suite must be run with attached devices') + + +if __name__ == '__main__': + main() diff --git a/debugging/brillo_adb_test.py b/debugging/brillo_adb_test.py new file mode 100755 index 0000000..ce47d5b --- /dev/null +++ b/debugging/brillo_adb_test.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python +# +# Copyright (C) 2015 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Script to run `adb` tests for Brillo. + +This script provides an easy way to run adb tests against a Brillo +device. By default it will just run the functional test once and exit. +If a duration is specified, the script will also run a more data- +intensive file test for the given duration. + +Ctrl+C can be used to stop the file test at any time. + +Example usage: + # Functional tests only. + $ brillo_adb_test.py + + # Functional tests + file test 3 times. + $ brillo_adb_test.py --duration 3 + + # Functional tests + file test for 20 minutes. + $ brillo_adb_test.py --duration 20m + + # Only file test for 4.5 hours. + $ brillo_adb_test.py --duration 4.5h --skip-functional-tests +""" + + +from __future__ import print_function + +import argparse +import os +import subprocess +import time +import unittest + +from adb import test_brillo_device + + +def time_string(seconds): + """Takes seconds and returns a simple H:MM:SS time string""" + minutes, seconds = divmod(int(seconds), 60) + hours, minutes = divmod(minutes, 60) + return '{:01}:{:02}:{:02}'.format(hours, minutes, seconds) + + +class TestRepeater(object): + """Manages running a test multiple times""" + + def __init__(self, iterations, duration, skip_functional_tests): + """Constructor. + + Args: + iterations: how many times to run the test, or None to use a time + duration instead. + duration: how long to run the test in seconds, or None to use an + iteration count instead. + skip_functional_tests: True to skip the initial functional tests. + """ + self.start_time = None + self.iterations = None + self.duration = None + if iterations is not None: + self.iterations = int(iterations) + elif duration is not None: + self.duration = duration + else: + raise ValueError('Must specify test iterations or duration.') + self.iteration_start_time = self.start_time + self.successes = 0 + self.failures = 0 + self.skip_functional_tests = skip_functional_tests + + def _start_iteration(self): + """Returns True if we should start another test iteration.""" + if self.iterations is not None: + self.iterations -= 1 + return self.iterations >= 0 + else: + now = time.time() + if now >= self.start_time + self.duration: + return False + self.iteration_start_time = now + return True + + def _result_string(self): + """Returns the current results as a string. + + Returned string depends on whether we're counting iterations or + time, and looks like this: + [Fri Oct 16 16:59:30 2015] 3/3 | pass: 2 fail: 0 + or this: + [Fri Oct 16 16:59:56 2015] 0:00:03/0:00:05 | pass: 2 fail: 0 + """ + timestamp = time.strftime('%c') + if self.iterations is not None: + iter_count = self.successes + self.failures + 1 + progress = '{}/{}'.format(iter_count, iter_count + self.iterations) + else: + progress = '{}/{}'.format( + time_string(self.iteration_start_time - self.start_time), + time_string(self.duration)) + return '[{}] {} | pass: {} fail: {}'.format( + timestamp, progress, self.successes, self.failures) + + def run_test(self): + """Run the functional and transfer tests. + + Returns: + An exit code to return from the script; 0 if all tests passed, + 1 if one or more test failed. + """ + if not self.skip_functional_tests: + suite = test_brillo_device.suite() + # unittest verbosity levels: + # 0 = result line only when done. + # 1 = 1-character results as tests run. + # 2 = test descriptions and results as they run. + result = unittest.TextTestRunner(verbosity=2).run(suite) + if not result.wasSuccessful(): + return 1 + print() + + data_file_name = None + self.start_time = time.time() + try: + while self._start_iteration(): + if not data_file_name: + # Currently crashes are triggered when a lot of data is + # sent from the device to the host, which this will + # replicate. Later on we could add host->device transfers + # also if needed. + size = 20 * 1000 * 1000 + data_file_name = test_brillo_device.create_data_file(size) + print('Pushing file to device') + print('-------------------------') + try: + subprocess.check_call( + ['adb', 'push', data_file_name, + test_brillo_device.FileTest.DEVICE_TEMP_FILE]) + except subprocess.CalledProcessError as e: + print('"{}" failed, aborting test'.format( + ' '.join(e.cmd))) + break + print() + print('Starting file pull loop') + print('-------------------------') + + try: + print(self._result_string()) + subprocess.check_call( + ['adb', 'pull', + test_brillo_device.FileTest.DEVICE_TEMP_FILE, + data_file_name]) + self.successes += 1 + except subprocess.CalledProcessError as e: + self.failures += 1 + print('Failure: "{}" exited with code {}'.format( + e.cmd, e.returncode)) + print(e.output) + print() + + except KeyboardInterrupt: + print('\nStopping test due to keyboard interrupt') + + if data_file_name: + print('Test finished in {} with {}/{} success'.format( + time_string(time.time() - self.start_time), + self.successes, self.successes + self.failures)) + os.remove(data_file_name) + try: + subprocess.check_call( + ['adb', 'shell', 'rm', '-f', + test_brillo_device.FileTest.DEVICE_TEMP_FILE]) + except subprocess.CalledProcessError: + # If previous failures happened this may not work which is fine. + pass + + if self.failures > 0: + return 1 + else: + return 0 + + +def parse_time_argument(arg, default_unit=''): + """Parses user input to get a time value. + + Args: + arg: user input string. + default_unit: unit to assume if unspecified. + + Returns: + A (value, seconds) tuple. Value will be set if no unit was + specified, otherwise seconds will be set. + + Raises: + ValueError: invalid |arg| value. + """ + # We can be a little loose with parsing, assume anything starting + # with 's' is seconds, 'm' for minutes, etc. + arg += default_unit + for char, multiplier in (('s', 1), ('m', 60), ('h', 3600), ('d', 86400)): + value, sep, _ = arg.partition(char) + if sep: + return (None, float(value) * multiplier) + return (float(arg), None) + + +def parse_arguments(): + """Parses the command-line arguments. + + Returns: + A TestRepeater object. + """ + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument('-d', '--duration', default='0', + type=parse_time_argument, + help='How long to run the test. <n> = iterations,' + ' <n>m = minutes, <n>h = hours.') + parser.add_argument('-s', '--skip-functional-tests', action='store_true', + help='Skip the initial functional tests.') + options = parser.parse_args() + + return TestRepeater(options.duration[0], options.duration[1], + options.skip_functional_tests) + + +def main(): + test_repeater = parse_arguments() + return test_repeater.run_test() + + +if __name__ == '__main__': + main() |