diff options
author | Steve Fung <stevefung@google.com> | 2016-04-19 03:44:23 -0700 |
---|---|---|
committer | Steve Fung <stevefung@google.com> | 2016-04-19 03:44:33 -0700 |
commit | b3baeb0038327709517abe6c9ae3e7682a00e0e5 (patch) | |
tree | 2c720739a44a68530c02d68ea60e6a299dcecb2b | |
parent | f574c6a64c36b12469873e82ab0ae4872059aebb (diff) | |
download | bdk-b3baeb0038327709517abe6c9ae3e7682a00e0e5.tar.gz |
Convert core/ to 4 space indent
Convert all files in the core/ folder to a 4 space indent
to comply with PEP8 style rules.
Bug: 28007659
Test: `python test_runner.py` passes.
Test: pylint passes.
Change-Id: If470162a50e29621cea84aef02dadb816ca1c46c
-rw-r--r-- | cli/lib/core/build.py | 191 | ||||
-rw-r--r-- | cli/lib/core/build_unittest.py | 213 | ||||
-rw-r--r-- | cli/lib/core/config.py | 315 | ||||
-rw-r--r-- | cli/lib/core/config_stub.py | 48 | ||||
-rw-r--r-- | cli/lib/core/config_unittest.py | 196 | ||||
-rw-r--r-- | cli/lib/core/image_build.py | 630 | ||||
-rw-r--r-- | cli/lib/core/image_build_unittest.py | 906 | ||||
-rw-r--r-- | cli/lib/core/popen.py | 8 | ||||
-rw-r--r-- | cli/lib/core/product.py | 255 | ||||
-rw-r--r-- | cli/lib/core/properties.py | 442 | ||||
-rw-r--r-- | cli/lib/core/provision.py | 218 | ||||
-rw-r--r-- | cli/lib/core/provision_unittest.py | 364 | ||||
-rw-r--r-- | cli/lib/core/timer.py | 128 | ||||
-rw-r--r-- | cli/lib/core/timer_unittest.py | 220 | ||||
-rw-r--r-- | cli/lib/core/tool.py | 519 | ||||
-rw-r--r-- | cli/lib/core/tool_unittest.py | 547 | ||||
-rw-r--r-- | cli/lib/core/user_config.py | 122 | ||||
-rw-r--r-- | cli/lib/core/user_config_stub.py | 27 | ||||
-rw-r--r-- | cli/lib/core/user_config_unittest.py | 165 | ||||
-rw-r--r-- | cli/lib/core/util.py | 192 | ||||
-rw-r--r-- | cli/lib/core/util_stub.py | 139 | ||||
-rw-r--r-- | cli/lib/core/util_unittest.py | 193 |
22 files changed, 3041 insertions, 2997 deletions
diff --git a/cli/lib/core/build.py b/cli/lib/core/build.py index 08b98c4..c8f7b42 100644 --- a/cli/lib/core/build.py +++ b/cli/lib/core/build.py @@ -27,15 +27,15 @@ import error class Error(error.Error): - """General build failure.""" + """General build failure.""" class BuildTypeError(Error): - """Raised when an invalid build type is given.""" + """Raised when an invalid build type is given.""" class BuildUtilityError(Error): - """Raised when a build utility fails or is missing.""" + """Raised when a build utility fails or is missing.""" BUILD_TYPES = ('user', 'userdebug', 'eng') @@ -43,104 +43,107 @@ BUILD_TYPES = ('user', 'userdebug', 'eng') def _GetBuildScript(os_version, bsp, build_type, out_dir=None, extra_make_args=None): - """Creates a shell script to run a platform build. + """Creates a shell script to run a platform build. - The script is small but uses several variables, which is why it's - easier just to build it here and run it directly. If the script - expands to be more complicated we probably want to move it into its - own file. + The script is small but uses several variables, which is why it's + easier just to build it here and run it directly. If the script + expands to be more complicated we probably want to move it into its + own file. - Args: - os_version: version of the Brillo OS to build. - bsp: name of the BSP to build. - build_type: one of BUILD_TYPES. - out_dir: directory for build output or None to build in-tree. - extra_make_args: additional make args as a list of strings or None. + Args: + os_version: version of the Brillo OS to build. + bsp: name of the BSP to build. + build_type: one of BUILD_TYPES. + out_dir: directory for build output or None to build in-tree. + extra_make_args: additional make args as a list of strings or None. - Returns: - A string shell script that will build the platform. - """ - make_args = [] + Returns: + A string shell script that will build the platform. + """ + make_args = [] - if out_dir is not None: - # Android build treats relative paths as relative to the source root, use - # abspath() to put it relative to the caller's current directory instead. - make_args.append('OUT_DIR="{}"'.format(os.path.abspath(out_dir))) + if out_dir is not None: + # Android build treats relative paths as relative to the source root, + # use abspath() to put it relative to the caller's current directory + # instead. + make_args.append('OUT_DIR="{}"'.format(os.path.abspath(out_dir))) - if extra_make_args: - make_args.extend(extra_make_args) + if extra_make_args: + make_args.extend(extra_make_args) - cmds = ['cd "{}"'.format(util.GetOSPath(os_version)), - '. build/envsetup.sh', - 'lunch "{}-{}"'.format(bsp, build_type), - 'm {}'.format(' '.join(make_args))] + cmds = ['cd "{}"'.format(util.GetOSPath(os_version)), + '. build/envsetup.sh', + 'lunch "{}-{}"'.format(bsp, build_type), + 'm {}'.format(' '.join(make_args))] - # Link all commands with && to exit immediately if one fails. - return ' && '.join(cmds) + # Link all commands with && to exit immediately if one fails. + return ' && '.join(cmds) def BuildPlatform(target, out_dir=None, extra_make_args=None): - """Builds the Brillo platform. - - Caller should validate target OS and Board before calling. - - Args: - target: core.parsing.target.Target to build. - out_dir: directory for build output or None to build in-tree. - extra_make_args: additional make args as a list of strings or None. - - Returns: - The build exit code. - - Raises: - BuildTypeError: target build_type is invalid. - BuildUtilityError: subprocess invocation fails. - """ - if target.build_type not in BUILD_TYPES: - raise BuildTypeError('Invalid build type: {} (must be one of {})'.format( - target.build_type, BUILD_TYPES)) - - build_script = _GetBuildScript(target.os_version, target.board, - target.build_type, - out_dir, extra_make_args) - - # Set up the build environment. We strip out most environment variables, - # but some values (e.g. USER, TERM) are useful or necessary for the build. - build_env = {var: os.environ[var] for var in tool.DEFAULT_PASSTHROUGH_ENV if - var in os.environ} - - # Make sure the output directory exists so we can log to it. - if not os.path.isdir(out_dir): - os.makedirs(out_dir) - log_file_path = os.path.join(out_dir, 'bdk_last_build.log') - - with target.get_device().linked(target.os_version): - # The build script uses bash features like && to link multiple commands and - # . to source scripts, so we invoke via bash. - try: - build_subprocess = subprocess.Popen(build_script, shell=True, - env=build_env, - executable='bash', - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - except OSError as e: - raise BuildUtilityError( - 'Failed to track a platform build. ' - 'Is "bash" installed, accessible, and in the PATH?: {}'.format(e)) - - # Piping a command to tee on the shell clobbers the exit code, so instead - # we make them two separate subprocesses and pipe them together. - try: - tee_subprocess = subprocess.Popen(['tee', log_file_path], env=build_env, - stdin=build_subprocess.stdout) - except OSError as e: - raise BuildUtilityError( - 'Failed to track a platform build. ' - 'Is "tee" installed, accessible, and in the PATH?: {}'.format(e)) - - # Close stdout to avoid pipes filling up and blocking forever if tee fails. - # https://docs.python.org/2/library/subprocess.html#replacing-shell-pipeline. - build_subprocess.stdout.close() - - tee_subprocess.communicate() - return build_subprocess.wait() + """Builds the Brillo platform. + + Caller should validate target OS and Board before calling. + + Args: + target: core.parsing.target.Target to build. + out_dir: directory for build output or None to build in-tree. + extra_make_args: additional make args as a list of strings or None. + + Returns: + The build exit code. + + Raises: + BuildTypeError: target build_type is invalid. + BuildUtilityError: subprocess invocation fails. + """ + if target.build_type not in BUILD_TYPES: + raise BuildTypeError( + 'Invalid build type: {} (must be one of {})'.format( + target.build_type, BUILD_TYPES)) + + build_script = _GetBuildScript(target.os_version, target.board, + target.build_type, out_dir, extra_make_args) + + # Set up the build environment. We strip out most environment variables, + # but some values (e.g. USER, TERM) are useful or necessary for the build. + build_env = {var: os.environ[var] for var in tool.DEFAULT_PASSTHROUGH_ENV + if var in os.environ} + + # Make sure the output directory exists so we can log to it. + if not os.path.isdir(out_dir): + os.makedirs(out_dir) + log_file_path = os.path.join(out_dir, 'bdk_last_build.log') + + with target.get_device().linked(target.os_version): + # The build script uses bash features like && to link multiple commands + # and to source scripts, so we invoke via bash. + try: + build_subprocess = subprocess.Popen(build_script, shell=True, + env=build_env, + executable='bash', + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + except OSError as e: + raise BuildUtilityError( + 'Failed to track a platform build. Is ' + '"bash" installed, accessible, and in the PATH?: {}'.format(e)) + + # Piping a command to tee on the shell clobbers the exit code, so + # instead we make them two separate subprocesses and pipe them together. + try: + tee_subprocess = subprocess.Popen(['tee', log_file_path], + env=build_env, + stdin=build_subprocess.stdout) + except OSError as e: + raise BuildUtilityError( + 'Failed to track a platform build. Is ' + '"tee" installed, accessible, and in the PATH?: {}'.format(e)) + + # Close stdout to avoid pipes filling up and blocking forever if tee + # fails. + # https://docs.python.org/2/library/subprocess.html#replacing-shell-pipeline. + build_subprocess.stdout.close() + + tee_subprocess.communicate() + return build_subprocess.wait() diff --git a/cli/lib/core/build_unittest.py b/cli/lib/core/build_unittest.py index fd38ac7..5fe4963 100644 --- a/cli/lib/core/build_unittest.py +++ b/cli/lib/core/build_unittest.py @@ -16,6 +16,7 @@ """Unit tests for build.py.""" + import os import unittest @@ -25,108 +26,112 @@ from core import util_stub from project import target_stub from test import stubs + class BuildPlatformTest(unittest.TestCase): - # Valid arguments listed here so we can test changing one at a time. - _BSP = 'test_board' - _BUILD_TYPE = 'user' - _OUT_DIR = '/foo/bar' - _OS_VERSION = '12.34' - _LOG_FILE = os.path.join(_OUT_DIR, 'bdk_last_build.log') - - def setUp(self): - self.stub_os = stubs.StubOs() - self.stub_util = util_stub.StubUtil(os_version=self._OS_VERSION) - self.stub_subprocess = stubs.StubSubprocess() - - # Allow BuildPlatform() to make the directory for the log file. - self.stub_os.should_makedirs = [self._OUT_DIR] - - build.os = self.stub_os - build.util = self.stub_util - build.subprocess = self.stub_subprocess - - self.device = device_stub.StubDevice(should_link_version=self._OS_VERSION) - self.target = target_stub.StubTarget( - board=self._BSP, build_type=self._BUILD_TYPE, - os_version=self._OS_VERSION, - device=self.device) - - def test_success(self): - make_command = self.stub_subprocess.AddCommand() - tee_command = self.stub_subprocess.AddCommand() - - self.assertEqual(0, build.BuildPlatform(self.target, self._OUT_DIR)) - - # pylint: disable=protected-access - make_command.AssertCallWas( - build._GetBuildScript(self.target.os_version, - self._BSP, self._BUILD_TYPE, self._OUT_DIR), - stdout=self.stub_subprocess.PIPE, stderr=self.stub_subprocess.STDOUT, - executable='bash', shell=True, env={}) - tee_command.AssertCallWas(['tee', self._LOG_FILE], - stdin=make_command.stdout, env={}) - - def test_success_extra_make_args(self): - """Tests passing additional make args from the user.""" - extra_make_args = ['-j', '40'] - make_command = self.stub_subprocess.AddCommand() - self.stub_subprocess.AddCommand(['tee', self._LOG_FILE]) - - self.assertEqual(0, build.BuildPlatform( - self.target, self._OUT_DIR, - extra_make_args=extra_make_args)) - - # pylint: disable=protected-access - make_command.AssertCallWas( - build._GetBuildScript(self.target.os_version, - self._BSP, self._BUILD_TYPE, self._OUT_DIR, - extra_make_args=extra_make_args), - stdout=self.stub_subprocess.PIPE, stderr=self.stub_subprocess.STDOUT, - executable='bash', shell=True, env={}) - - def test_success_env_passthrough(self): - """Tests passing needed environmental variables through.""" - self.stub_os.environ['USER'] = 'test_user' - self.stub_os.environ['PATH'] = 'test_path' - make_command = self.stub_subprocess.AddCommand() - tee_command = self.stub_subprocess.AddCommand(['tee', self._LOG_FILE]) - - self.assertEqual(0, build.BuildPlatform( - self.target, self._OUT_DIR)) - - # Make sure USER passes through but PATH does not. - make_command.AssertCallContained(env={'USER': 'test_user'}) - tee_command.AssertCallContained(env={'USER': 'test_user'}) - - def test_return_exit_code(self): - """Tests that the make exit code is returned.""" - self.stub_subprocess.AddCommand(ret_code=1) - self.stub_subprocess.AddCommand(['tee', self._LOG_FILE]) - - self.assertEqual(1, build.BuildPlatform( - self.target, self._OUT_DIR)) - - def test_invalid_build_type(self): - self.target.build_type = 'bad_build_type' - with self.assertRaises(build.BuildTypeError): - build.BuildPlatform(self.target, self._OUT_DIR) - - @staticmethod - def _raise_oserror(): - raise OSError('a faked OSError has occurred.') - - def test_missing_bash(self): - """Tests passing additional make args from the user.""" - # Expect a make command. - self.stub_subprocess.AddCommand(init_side_effect=self._raise_oserror) - with self.assertRaises(build.BuildUtilityError): - build.BuildPlatform(self.target, self._OUT_DIR) - - def test_missing_tee(self): - """Tests passing additional make args from the user.""" - # Expect a make command. - self.stub_subprocess.AddCommand() - self.stub_subprocess.AddCommand(['tee', self._LOG_FILE], - init_side_effect=self._raise_oserror) - with self.assertRaises(build.BuildUtilityError): - build.BuildPlatform(self.target, self._OUT_DIR) + # Valid arguments listed here so we can test changing one at a time. + _BSP = 'test_board' + _BUILD_TYPE = 'user' + _OUT_DIR = '/foo/bar' + _OS_VERSION = '12.34' + _LOG_FILE = os.path.join(_OUT_DIR, 'bdk_last_build.log') + + def setUp(self): + self.stub_os = stubs.StubOs() + self.stub_util = util_stub.StubUtil(os_version=self._OS_VERSION) + self.stub_subprocess = stubs.StubSubprocess() + + # Allow BuildPlatform() to make the directory for the log file. + self.stub_os.should_makedirs = [self._OUT_DIR] + + build.os = self.stub_os + build.util = self.stub_util + build.subprocess = self.stub_subprocess + + self.device = device_stub.StubDevice( + should_link_version=self._OS_VERSION) + self.target = target_stub.StubTarget( + board=self._BSP, build_type=self._BUILD_TYPE, + os_version=self._OS_VERSION, + device=self.device) + + def test_success(self): + make_command = self.stub_subprocess.AddCommand() + tee_command = self.stub_subprocess.AddCommand() + + self.assertEqual(0, build.BuildPlatform(self.target, self._OUT_DIR)) + + # pylint: disable=protected-access + make_command.AssertCallWas( + build._GetBuildScript(self.target.os_version, + self._BSP, self._BUILD_TYPE, self._OUT_DIR), + stdout=self.stub_subprocess.PIPE, + stderr=self.stub_subprocess.STDOUT, + executable='bash', shell=True, env={}) + tee_command.AssertCallWas(['tee', self._LOG_FILE], + stdin=make_command.stdout, env={}) + + def test_success_extra_make_args(self): + """Tests passing additional make args from the user.""" + extra_make_args = ['-j', '40'] + make_command = self.stub_subprocess.AddCommand() + self.stub_subprocess.AddCommand(['tee', self._LOG_FILE]) + + self.assertEqual(0, build.BuildPlatform( + self.target, self._OUT_DIR, + extra_make_args=extra_make_args)) + + # pylint: disable=protected-access + make_command.AssertCallWas( + build._GetBuildScript(self.target.os_version, + self._BSP, self._BUILD_TYPE, self._OUT_DIR, + extra_make_args=extra_make_args), + stdout=self.stub_subprocess.PIPE, + stderr=self.stub_subprocess.STDOUT, + executable='bash', shell=True, env={}) + + def test_success_env_passthrough(self): + """Tests passing needed environmental variables through.""" + self.stub_os.environ['USER'] = 'test_user' + self.stub_os.environ['PATH'] = 'test_path' + make_command = self.stub_subprocess.AddCommand() + tee_command = self.stub_subprocess.AddCommand(['tee', self._LOG_FILE]) + + self.assertEqual(0, build.BuildPlatform( + self.target, self._OUT_DIR)) + + # Make sure USER passes through but PATH does not. + make_command.AssertCallContained(env={'USER': 'test_user'}) + tee_command.AssertCallContained(env={'USER': 'test_user'}) + + def test_return_exit_code(self): + """Tests that the make exit code is returned.""" + self.stub_subprocess.AddCommand(ret_code=1) + self.stub_subprocess.AddCommand(['tee', self._LOG_FILE]) + + self.assertEqual(1, build.BuildPlatform( + self.target, self._OUT_DIR)) + + def test_invalid_build_type(self): + self.target.build_type = 'bad_build_type' + with self.assertRaises(build.BuildTypeError): + build.BuildPlatform(self.target, self._OUT_DIR) + + @staticmethod + def _raise_oserror(): + raise OSError('a faked OSError has occurred.') + + def test_missing_bash(self): + """Tests passing additional make args from the user.""" + # Expect a make command. + self.stub_subprocess.AddCommand(init_side_effect=self._raise_oserror) + with self.assertRaises(build.BuildUtilityError): + build.BuildPlatform(self.target, self._OUT_DIR) + + def test_missing_tee(self): + """Tests passing additional make args from the user.""" + # Expect a make command. + self.stub_subprocess.AddCommand() + self.stub_subprocess.AddCommand(['tee', self._LOG_FILE], + init_side_effect=self._raise_oserror) + with self.assertRaises(build.BuildUtilityError): + build.BuildPlatform(self.target, self._OUT_DIR) diff --git a/cli/lib/core/config.py b/cli/lib/core/config.py index c64f876..cd232ad 100644 --- a/cli/lib/core/config.py +++ b/cli/lib/core/config.py @@ -20,182 +20,181 @@ import os import sqlite3 import string -import uuid from core import properties -from core import util class Store(properties.PropBase): - """config.Store - A sqlite3-backed dict interface for storing persistent data. + """A sqlite3-backed dict interface for storing persistent data. + Usage: - cd = config.Store('data.db') - cs['mykey'] = 'my val' - print cs['mykey'] - 'mykey' - """ - - PREFIX = 'data_' - CACHING = True - - def __init__(self, file_path, table='data'): - super(Store, self).__init__() - self._conn = None - self._path = file_path - self._table = table - self._initialized = False - - def _setup(self): - if self._initialized: - return True - if self._path != ':memory:': - db_dir = os.path.dirname(self._path) - if not os.path.isdir(db_dir): - os.makedirs(db_dir) - self._conn = sqlite3.connect(self._path); - self._conn.execute('create table if not exists %s ' - '(key text PRIMARY KEY, val text)' % self._table) - self._initialized = True - - def dict(self): - """Dumps the entire table as a dict.""" - if not self._initialized: - self._setup() - c = self._conn.cursor() - d = {} - for row in c.execute("select key, val from %s" % self._table): - d[row[0]] = row[1] - return d - - def _Get(self, key): - if not self._initialized: - self._setup() - c = self._conn.cursor() - c.execute("select val from %s where key=?" % self._table, (key, )) - ret = c.fetchone() - if ret: - return ret[0] - return None - - def _Set(self, key, val): - if not self._initialized: - self._setup() - c = self._conn.cursor() - c.execute("insert or replace into %s values (?, ?)" % self._table, - (key, val)) - self._conn.commit() - return val - - def _load(self, key): - if key in self.properties(): - key = self.PREFIX + key - return self._Get(key) - - def _save(self, key, value): - if key in self.properties(): - key = self.PREFIX + key - return self._Set(key, value) + cd = config.Store('data.db') + cs['mykey'] = 'my val' + print cs['mykey'] + 'mykey' + """ + + PREFIX = 'data_' + CACHING = True + + def __init__(self, file_path, table='data'): + super(Store, self).__init__() + self._conn = None + self._path = file_path + self._table = table + self._initialized = False + + def _setup(self): + if self._initialized: + return True + if self._path != ':memory:': + db_dir = os.path.dirname(self._path) + if not os.path.isdir(db_dir): + os.makedirs(db_dir) + self._conn = sqlite3.connect(self._path) + self._conn.execute('create table if not exists %s ' + '(key text PRIMARY KEY, val text)' % self._table) + self._initialized = True + + def dict(self): + """Dumps the entire table as a dict.""" + if not self._initialized: + self._setup() + c = self._conn.cursor() + d = {} + for row in c.execute("select key, val from %s" % self._table): + d[row[0]] = row[1] + return d + + def _Get(self, key): + if not self._initialized: + self._setup() + c = self._conn.cursor() + c.execute("select val from %s where key=?" % self._table, (key, )) + ret = c.fetchone() + if ret: + return ret[0] + return None + + def _Set(self, key, val): + if not self._initialized: + self._setup() + c = self._conn.cursor() + c.execute("insert or replace into %s values (?, ?)" % self._table, + (key, val)) + self._conn.commit() + return val + + def _load(self, key): + if key in self.properties(): + key = self.PREFIX + key + return self._Get(key) + + def _save(self, key, value): + if key in self.properties(): + key = self.PREFIX + key + return self._Set(key, value) class DictStore(properties.PropBase): - """config.DictStore - A dict-backed store primarily used for in-memory replacement for other - stores in tests. - """ - REQUIRED_PROPS = {} - OPTIONAL_PROPS = {} + """A dict-backed store primarily used for in-memory replacement for other + stores in tests. + """ + REQUIRED_PROPS = {} + OPTIONAL_PROPS = {} - def __init__(self): - super(DictStore, self).__init__() - self._d = {} + def __init__(self): + super(DictStore, self).__init__() + self._d = {} - def _save(self, key, value): - self._d[key] = value - return value + def _save(self, key, value): + self._d[key] = value + return value - def _load(self, key): - if not key in self._d: - return '' - return self._d[key] + def _load(self, key): + if not key in self._d: + return '' + return self._d[key] class FileStore(properties.PropBase): - """config.FileStore - A file-backed dict interface for storing persistent data. + """A file-backed dict interface for storing persistent data. Usage: - cd = config.FileStore(os.path.join(PRODUCT_DIR, 'data_dir')) - cd['myprefix/mykey'] = 'my val' - print cd['myprefix/mykey'] - 'my val' - """ - - CACHING = True - - def __init__(self, path_prefix='config'): - self._prefix = path_prefix - super(FileStore, self).__init__() - - def _remove_comments(self, string_): - """Returns |string_| with comments removed and whitespace adjusted. - - Removes whitespace from beginning and end of all lines, - newline characters are replaced with ' '. + cd = config.FileStore(os.path.join(PRODUCT_DIR, 'data_dir')) + cd['myprefix/mykey'] = 'my val' + print cd['myprefix/mykey'] + 'my val' """ - value = '' - lines = string_.split('\n') - for line in lines: - stripped_line = line.strip() - if len(stripped_line) > 0 and stripped_line[0] == '#': - continue - value += line + ' ' - return value.strip() - - def _load_from_file(self, keyfile): - """Reads in the contents of |keyfile| stripping out comments and newlines""" - value = '' - if not os.path.exists(keyfile): - return value - with open(keyfile, 'r') as f: - value = f.read() - return self._remove_comments(value) - - def _save_to_file(self, keyfile, value): - """Write |value| to |keyfile|""" - if type(value) != str: - raise TypeError, 'value must be a str' - if not os.path.exists(os.path.dirname(keyfile)): - os.makedirs(os.path.dirname(keyfile)) - with open(keyfile, 'w') as f: - # Note, if comments are written, they will not be read back in! - f.write(value) - return self._remove_comments(value) - - def _key_to_path(self, key): - """Converts a key of [key0/[key2/[.../]]key to a host-compatible path""" - return os.path.join(self._prefix, *string.split(key, '/')) - - def _load(self, key): - """Returns the value for |key|""" - keyfile = self._key_to_path(key) - return self._load_from_file(keyfile) - - def _save(self, key, value): - """Assignes |value| to |key|""" - keyfile = self._key_to_path(key) - return self._save_to_file(keyfile, value) + + CACHING = True + + def __init__(self, path_prefix='config'): + self._prefix = path_prefix + super(FileStore, self).__init__() + + def _remove_comments(self, string_): + """Returns |string_| with comments removed and whitespace adjusted. + + Removes whitespace from beginning and end of all lines, + newline characters are replaced with ' '. + """ + value = '' + lines = string_.split('\n') + for line in lines: + stripped_line = line.strip() + if len(stripped_line) > 0 and stripped_line[0] == '#': + continue + value += line + ' ' + return value.strip() + + def _load_from_file(self, keyfile): + """Reads in the contents of |keyfile| stripping out comments and + newlines + """ + value = '' + if not os.path.exists(keyfile): + return value + with open(keyfile, 'r') as f: + value = f.read() + return self._remove_comments(value) + + def _save_to_file(self, keyfile, value): + """Write |value| to |keyfile|""" + if type(value) != str: + raise TypeError, 'value must be a str' + if not os.path.exists(os.path.dirname(keyfile)): + os.makedirs(os.path.dirname(keyfile)) + with open(keyfile, 'w') as f: + # Note, if comments are written, they will not be read back in! + f.write(value) + return self._remove_comments(value) + + def _key_to_path(self, key): + """Converts a key of [key0/[key2/[.../]]key to a host-compatible path""" + return os.path.join(self._prefix, *string.split(key, '/')) + + def _load(self, key): + """Returns the value for |key|""" + keyfile = self._key_to_path(key) + return self._load_from_file(keyfile) + + def _save(self, key, value): + """Assignes |value| to |key|""" + keyfile = self._key_to_path(key) + return self._save_to_file(keyfile, value) class ProductFileStore(FileStore): - REQUIRED_PROPS = {'name': [], 'brand': [], 'device': [], 'manufacturer': []} - OPTIONAL_PROPS = { - 'bdk/version': [], 'bdk/buildtype': ['eng', 'user', 'userdebug'], - 'bdk/java': ['0', '1'], 'bdk/allowed_environ': [], - 'brillo/product_id': [], 'copy_files': [], - 'brillo/crash_server': [], 'packages': []} - - def __init__(self, product_path): - if product_path is None: - raise ValueError, 'The product path must not be None.' - super(ProductFileStore, self).__init__(os.path.join(product_path, 'config')) + REQUIRED_PROPS = {'name': [], 'brand': [], 'device': [], 'manufacturer': []} + OPTIONAL_PROPS = { + 'bdk/version': [], 'bdk/buildtype': ['eng', 'user', 'userdebug'], + 'bdk/java': ['0', '1'], 'bdk/allowed_environ': [], + 'brillo/product_id': [], 'copy_files': [], + 'brillo/crash_server': [], 'packages': []} + + def __init__(self, product_path): + if product_path is None: + raise ValueError, 'The product path must not be None.' + super(ProductFileStore, self).__init__(os.path.join(product_path, + 'config')) diff --git a/cli/lib/core/config_stub.py b/cli/lib/core/config_stub.py index 2644bcb..9131515 100644 --- a/cli/lib/core/config_stub.py +++ b/cli/lib/core/config_stub.py @@ -18,35 +18,35 @@ class StubConfig(object): - """Stubs the core.config module. + """Stubs the core.config module. - Attributes: - user_store: the StubUserStore object that will be passed to the - code under test. Modify this to get the desired behavior. - """ + Attributes: + user_store: the StubUserStore object that will be passed to the + code under test. Modify this to get the desired behavior. + """ - def __init__(self, **kwargs): - """Initializes a StubConfig object. + def __init__(self, **kwargs): + """Initializes a StubConfig object. - Args: - kwargs: passed through to StubUserStore init. - """ - self.user_store = self.StubUserStore(**kwargs) + Args: + kwargs: passed through to StubUserStore init. + """ + self.user_store = self.StubUserStore(**kwargs) - def UserStore(self, *_args, **_kwargs): - """Returns our user_store object.""" - return self.user_store + def UserStore(self, *_args, **_kwargs): + """Returns our user_store object.""" + return self.user_store - class StubUserStore(object): - """Stubs the config.UserStore class.""" + class StubUserStore(object): + """Stubs the config.UserStore class.""" - def __init__(self, metrics_opt_in='1', is_complete=True): - self.metrics_opt_in = metrics_opt_in - self.is_complete = is_complete - self.uid = 0 + def __init__(self, metrics_opt_in='1', is_complete=True): + self.metrics_opt_in = metrics_opt_in + self.is_complete = is_complete + self.uid = 0 - def initialize(self, *_args, **_kwargs): - pass + def initialize(self, *_args, **_kwargs): + pass - def complete(self): - return self.is_complete + def complete(self): + return self.is_complete diff --git a/cli/lib/core/config_unittest.py b/cli/lib/core/config_unittest.py index 8daae9f..7145cfe 100644 --- a/cli/lib/core/config_unittest.py +++ b/cli/lib/core/config_unittest.py @@ -25,119 +25,119 @@ from test import stubs class ProductFileStoreTest(unittest.TestCase): - def setUp(self): - self.product_path = 'product_path' - stub_os = stubs.StubOs() - self.stub_open = stubs.StubOpen(stub_os) - config.open = self.stub_open.open - config.os = stub_os - self.store = config.ProductFileStore(self.product_path) - - def test_load_base(self): - brand_path = 'product_path/config/brand' - self.stub_open.files[brand_path] = stubs.StubFile('my-brand') - config.os.path.should_exist.append(brand_path) - self.assertEqual(self.store.brand, 'my-brand') - - def test_load_deep(self): - version_path = 'product_path/config/bdk/version' - expected = 'veggies.1.2.3.4' - self.stub_open.files[version_path] = stubs.StubFile(expected) - config.os.path.should_exist.append(version_path) - self.assertEqual(self.store.bdk.version, expected) - - def test_load_comments(self): - expected = 'veggies.1.2.3.4' - input_ = """ + def setUp(self): + self.product_path = 'product_path' + stub_os = stubs.StubOs() + self.stub_open = stubs.StubOpen(stub_os) + config.open = self.stub_open.open + config.os = stub_os + self.store = config.ProductFileStore(self.product_path) + + def test_load_base(self): + brand_path = 'product_path/config/brand' + self.stub_open.files[brand_path] = stubs.StubFile('my-brand') + config.os.path.should_exist.append(brand_path) + self.assertEqual(self.store.brand, 'my-brand') + + def test_load_deep(self): + version_path = 'product_path/config/bdk/version' + expected = 'veggies.1.2.3.4' + self.stub_open.files[version_path] = stubs.StubFile(expected) + config.os.path.should_exist.append(version_path) + self.assertEqual(self.store.bdk.version, expected) + + def test_load_comments(self): + expected = 'veggies.1.2.3.4' + input_ = """ # Current version: name.num.bers %s # That's it! """ % expected - f = stubs.StubFile(input_) - self.stub_open.files['product_path/config/bdk/version'] = f - config.os.path.should_exist = ['product_path/config/bdk', - 'product_path/config/bdk/version'] - self.assertEqual(self.store.bdk.version, expected) - self.assertEqual(f.contents, string.split(input_, '\n')) - - def test_load_multiline(self): - expected = 'ledflasher ledservice' - input_ = """ + f = stubs.StubFile(input_) + self.stub_open.files['product_path/config/bdk/version'] = f + config.os.path.should_exist = ['product_path/config/bdk', + 'product_path/config/bdk/version'] + self.assertEqual(self.store.bdk.version, expected) + self.assertEqual(f.contents, string.split(input_, '\n')) + + def test_load_multiline(self): + expected = 'ledflasher ledservice' + input_ = """ ledflasher ledservice """ - f = stubs.StubFile(input_) - self.stub_open.files['product_path/config/packages'] = f - config.os.path.should_exist = ['product_path/config', - 'product_path/config/packages'] - self.assertEqual(self.store.packages, expected) - self.assertEqual(f.contents, string.split(input_, '\n')) - - def test_load_invalid(self): - with self.assertRaises(AttributeError): - _ = self.store.no.real.thing - - def test_save_exists(self): - f = stubs.StubFile() - self.stub_open.files['product_path/config/bdk/version'] = f - config.os.path.should_exist += ['product_path/config/bdk', - 'product_path/config/bdk/version'] - expected = 'everything awesome' - self.store.bdk.version = expected - self.assertEqual(f.contents, [expected]) - - def test_save_makedirs_fail(self): - config.os.path.should_exist = [] - with self.assertRaises(OSError): - self.store.bdk.version = 'dontmatter' - - def test_save_makedirs_works(self): - f = stubs.StubFile() - self.stub_open.files['product_path/config/bdk/version'] = f - config.os.should_makedirs = ['product_path/config/bdk'] - expected = 'everything awesome' - self.store.bdk.version = expected - self.assertEqual(f.contents, [expected]) - - def test_save_valid_buildtype(self): - f = stubs.StubFile() - self.stub_open.files['product_path/config/bdk/buildtype'] = f - config.os.should_makedirs = ['product_path/config/bdk'] - expected = 'userdebug' - with self.assertRaises(ValueError): - self.store.bdk.buildtype = 'not legit' - self.store.bdk.buildtype = expected - self.assertEqual(f.contents, [expected]) - - def test_save_load_comments(self): - f = stubs.StubFile() - self.stub_open.files['product_path/config/packages'] = f - config.os.path.should_exist = ['product_path/config', - 'product_path/config/packages'] - input_ = """ + f = stubs.StubFile(input_) + self.stub_open.files['product_path/config/packages'] = f + config.os.path.should_exist = ['product_path/config', + 'product_path/config/packages'] + self.assertEqual(self.store.packages, expected) + self.assertEqual(f.contents, string.split(input_, '\n')) + + def test_load_invalid(self): + with self.assertRaises(AttributeError): + _ = self.store.no.real.thing + + def test_save_exists(self): + f = stubs.StubFile() + self.stub_open.files['product_path/config/bdk/version'] = f + config.os.path.should_exist += ['product_path/config/bdk', + 'product_path/config/bdk/version'] + expected = 'everything awesome' + self.store.bdk.version = expected + self.assertEqual(f.contents, [expected]) + + def test_save_makedirs_fail(self): + config.os.path.should_exist = [] + with self.assertRaises(OSError): + self.store.bdk.version = 'dontmatter' + + def test_save_makedirs_works(self): + f = stubs.StubFile() + self.stub_open.files['product_path/config/bdk/version'] = f + config.os.should_makedirs = ['product_path/config/bdk'] + expected = 'everything awesome' + self.store.bdk.version = expected + self.assertEqual(f.contents, [expected]) + + def test_save_valid_buildtype(self): + f = stubs.StubFile() + self.stub_open.files['product_path/config/bdk/buildtype'] = f + config.os.should_makedirs = ['product_path/config/bdk'] + expected = 'userdebug' + with self.assertRaises(ValueError): + self.store.bdk.buildtype = 'not legit' + self.store.bdk.buildtype = expected + self.assertEqual(f.contents, [expected]) + + def test_save_load_comments(self): + f = stubs.StubFile() + self.stub_open.files['product_path/config/packages'] = f + config.os.path.should_exist = ['product_path/config', + 'product_path/config/packages'] + input_ = """ # Running service: ledservice # Debugging app ledprobe """ - self.store.packages = input_ - self.assertEqual(string.join(f.contents, '\n'), input_) - expected = 'ledservice ledprobe' - self.assertEqual(self.store.packages, expected) - - def test_dict(self): - f = stubs.StubFile() - self.stub_open.files['product_path/config/packages'] = f - config.os.path.should_exist = ['product_path/config', - 'product_path/config/packages'] - input_ = """ + self.store.packages = input_ + self.assertEqual(string.join(f.contents, '\n'), input_) + expected = 'ledservice ledprobe' + self.assertEqual(self.store.packages, expected) + + def test_dict(self): + f = stubs.StubFile() + self.stub_open.files['product_path/config/packages'] = f + config.os.path.should_exist = ['product_path/config', + 'product_path/config/packages'] + input_ = """ # Running service: ledservice # Debugging app ledprobe """ - self.store.packages = input_ - self.assertEqual(string.join(f.contents, '\n'), input_) - expected = 'ledservice ledprobe' - self.assertEqual(self.store.dict()['packages'], expected) + self.store.packages = input_ + self.assertEqual(string.join(f.contents, '\n'), input_) + expected = 'ledservice ledprobe' + self.assertEqual(self.store.dict()['packages'], expected) diff --git a/cli/lib/core/image_build.py b/cli/lib/core/image_build.py index 873b6ee..4088f50 100644 --- a/cli/lib/core/image_build.py +++ b/cli/lib/core/image_build.py @@ -37,338 +37,344 @@ IMAGE_TYPES = [IMAGE_TYPE_ODM, IMAGE_TYPE_SYSTEM] class Error(error.Error): - """General build failure.""" + """General build failure.""" class PathError(Error): - """Raised when a provided path does not meet expectations.""" + """Raised when a provided path does not meet expectations.""" class ImageTypeError(Error): - """Raise when an unknown image type is seen.""" + """Raise when an unknown image type is seen.""" def _CheckBuildImagePaths(cache_dir, output_dir, product_out, host_tools, build_tools): - """Checks that all paths necessary for an image build meet expectations. - - Does not validate the contents of any paths. - Will create an empty output_dir if it doesn't already exist. - - Args: - cache_dir: Directory where user files and system files are merged. - (This is normally per-target, not the global cache dir.) - output_dir: Directory where image build output should be placed. - product_out: Directory where the built platform can be found. - host_tools: Directory where host tools can be found. - build_tools: Directory where build tools can be found. - - Raises: - PathError: An expected directory cannot be found or isn't a dir. - """ - if not os.path.isdir(cache_dir): - raise PathError('Cache dir "{}" is not a directory.'.format(cache_dir)) - if os.path.isfile(output_dir): - raise PathError('Can not create output dir "{}", is a file'.format( - output_dir)) - if not os.path.isdir(product_out): - raise PathError( - 'Could not find platform build output "{}". ' - 'Use `bdk build platform` to build it.'.format( - product_out)) - if not os.path.isdir(host_tools): - # Probably tampered with/not built. - raise PathError( - 'Could not find host tools directory ({}). ' - 'Use `bdk build platform` to build it.'.format(host_tools)) - if not os.path.isdir(build_tools): - # This should only be missing if it was tampered with. - raise PathError( - 'Could not find build tools directory ({}). ' - 'You may need to re-download the OS version.'.format( - build_tools)) - - if not os.path.isdir(output_dir): - os.makedirs(output_dir) + """Checks that all paths necessary for an image build meet expectations. + + Does not validate the contents of any paths. + Will create an empty output_dir if it doesn't already exist. + + Args: + cache_dir: Directory where user files and system files are merged. + (This is normally per-target, not the global cache dir.) + output_dir: Directory where image build output should be placed. + product_out: Directory where the built platform can be found. + host_tools: Directory where host tools can be found. + build_tools: Directory where build tools can be found. + + Raises: + PathError: An expected directory cannot be found or isn't a dir. + """ + if not os.path.isdir(cache_dir): + raise PathError('Cache dir "{}" is not a directory.'.format(cache_dir)) + if os.path.isfile(output_dir): + raise PathError('Can not create output dir "{}", is a file'.format( + output_dir)) + if not os.path.isdir(product_out): + raise PathError( + 'Could not find platform build output "{}". ' + 'Use `bdk build platform` to build it.'.format( + product_out)) + if not os.path.isdir(host_tools): + # Probably tampered with/not built. + raise PathError( + 'Could not find host tools directory ({}). ' + 'Use `bdk build platform` to build it.'.format(host_tools)) + if not os.path.isdir(build_tools): + # This should only be missing if it was tampered with. + raise PathError( + 'Could not find build tools directory ({}). ' + 'You may need to re-download the OS version.'.format( + build_tools)) + + if not os.path.isdir(output_dir): + os.makedirs(output_dir) def AddTargetOsPacks(spec, target): - """Adds target OS packs to the spec. - - Args: - spec: project.project_spec.ProjectSpec to add the OS packs to. - target: project.target.Target object to get OS info from. - """ - platform_dir = target.platform_build_cache() - product_out = util.GetAndroidProductOut(platform_dir, target.board) - os_packs = packs.Packs() - os_packs.namespace = target.os_namespace - os_core = pack.Pack(os_packs.namespace, 'generated_system') - os_core.add_provides('os.core') - base_copy = pack.Copy(os_core) - base_copy.src = os.path.join(product_out, 'system') - base_copy.src_type = pack.CopyType.DIR - base_copy.recurse = True - base_copy.dst = '/system' - base_copy.dst_type = pack.CopyType.DIR - # Do not generate fs_config_* or file_context_* for this pack. - base_copy.acl.override_build = False - os_core.add_copy(base_copy) - os_packs.add_pack(os_core) - spec.add_packs(os_packs) + """Adds target OS packs to the spec. + + Args: + spec: project.project_spec.ProjectSpec to add the OS packs to. + target: project.target.Target object to get OS info from. + """ + platform_dir = target.platform_build_cache() + product_out = util.GetAndroidProductOut(platform_dir, target.board) + os_packs = packs.Packs() + os_packs.namespace = target.os_namespace + os_core = pack.Pack(os_packs.namespace, 'generated_system') + os_core.add_provides('os.core') + base_copy = pack.Copy(os_core) + base_copy.src = os.path.join(product_out, 'system') + base_copy.src_type = pack.CopyType.DIR + base_copy.recurse = True + base_copy.dst = '/system' + base_copy.dst_type = pack.CopyType.DIR + # Do not generate fs_config_* or file_context_* for this pack. + base_copy.acl.override_build = False + os_core.add_copy(base_copy) + os_packs.add_pack(os_core) + spec.add_packs(os_packs) def CreateTargetCache(spec, target, cache_dir, mountpoint='/', update=True, verbose=True): - """Copies all files specified in the target to a single cache dir. - - This call will create a submap from the parent spec and then use the - resulting copy destinations to populate the supplied path as if it were - the root of the destination. - - In the future, non-os and non-board packs will likely end up in their - own image (not /system), so this will create that cache ignoring any - Copy() nodes owned by a Pack that is prefixed by the target os or board. - - Args: - spec: project.ProjectSpec global specification - target: project.target.Target to cache - cache_dir: path to place the root/ of copied files. - mountpoint: optional path prefix to exclusively cache. - update: optionally, only replaces files that have changed. - verbose: (optional). If True, print information about what's - happening. Default True. - - Returns: - dict of uncached { dst_path => copy } - - Raises: - project.dependency.Error: If there is an unfulfilled dependency. - PathError: If a required path is missing. - """ - target_map = target.create_submap(spec.packmap) - - cache_root = os.path.join(cache_dir, 'root') - cache = sysroot.Sysroot(cache_root, copy_newer_only=update) - fs_config_files = {} - fs_config_dirs = {} - file_context = set() - uncached = {} - - # pack.Copy._reconcile_paths() *should* make these always what is seen here. - # TODO(wad): Add a <link> node. - allowed_copies = [ - (pack.CopyType.FILE, pack.CopyType.FILE, False), - (pack.CopyType.GLOB, pack.CopyType.DIR, False), - (pack.CopyType.GLOB, pack.CopyType.DIR, True), - (pack.CopyType.DIR, pack.CopyType.DIR, False), - (pack.CopyType.DIR, pack.CopyType.DIR, True), - ] - - final_files = {} - for destination, copy in target_map.copy_destinations.iteritems(): - copy = copy[0] # deduping has already occurred. - - # Currently, skip unknown mountpoints. - if not destination.startswith(mountpoint): - uncached[destination] = copy - continue - - # All destinations are required to be absolute. - # Make them relative so they play nicely with Sysroot(). - destination = destination.lstrip('/') - - # Proactively check allowed combinations so that the error - # handling isn't interleaved with logic. - if (copy.src_type, copy.dst_type, copy.recurse) not in allowed_copies: - # TODO(wad): Should these be FileABugError()s? - raise PathError( - '{}: impossible copy reached: {}'.format(copy.pack.origin, copy)) - - # TODO(arihc): Will os packs ever draw in files from BSPs - # (other than build output files)? If so, need to link BSP to OS. - files = set() - try: - if copy.src_type == pack.CopyType.FILE: # FILE -> FILE - cache.Makedirs(os.path.dirname(destination)) - if os.path.islink(copy.src): - # TODO(wad): Resolve how to skip permission setting on symlinks. - cache.AddSymlink(copy.src, destination) - else: - cache.AddFile(copy.src, destination) - files.add(destination) - elif copy.src_type == pack.CopyType.GLOB: # GLOB -> DIR - files.update(cache.AddGlob(copy.src, destination, recurse=copy.recurse)) - elif copy.src_type == pack.CopyType.DIR: # DIR -> DIR - files.update(cache.AddDir(copy.src, destination, - recurse=copy.recurse, symlinks=True)) - fs_config_dirs[destination] = copy.acl.fs_config(binary=True) - for f in files: - # Even though the fs_config_files format support globs, the final list - # of files is known at this point. - fs_config_files[f] = copy.acl.fs_config(path=f, binary=True) - if copy.acl.selabel: - file_context.update([copy.acl.file_context(path=f)]) - final_files[os.path.sep + f] = copy - - except IOError as e: - # Annotate the error with the line that where the <copy> is defined. - raise PathError('{}: {}'.format(copy.origin, e)) - - # Walk the cache tree and remove any unmanaged dirs. First - # we compute all allowed subpaths from the list of copied files, then - # walk the cache tree and remove any unknown files and directories. - final_dirs = set('/') - for dst, copy in final_files.iteritems(): - # Final files is always a full path so let's drop the file. - dst = os.path.dirname(dst) - while dst != os.path.sep: - # TODO(b/27848879) Add <dir> node set-acl support. - tmp_acl = pack.Copy(None, dst).acl - tmp_acl.perms = '0555' # Default to rx for dirs. - tmp_acl.override_build = copy.acl.override_build - if dst not in fs_config_dirs: - fs_config_dirs[dst] = '{}'.format(tmp_acl.fs_config(binary=True)) - # The default root map covers file_context for now. - final_dirs.add(dst) - dst = os.path.dirname(dst) - - all_paths = set() - all_files = set() - # Walk the tree as quickly as possible with os.walk. - # Use set differencing to compute the unmanaged entries. - for root, _, files in os.walk(cache_root): - dst_path = root[len(cache_root):] or os.path.sep - # Don't touch unmanaged files outside of the given mountpoint. - if not dst_path.startswith(mountpoint): - continue - all_paths.add(dst_path) - all_files.update(set([os.path.join(dst_path, f) for f in files])) - unmanaged_file = all_files.difference(final_files.keys()) - unmanaged_path = all_paths.difference(final_dirs) - - if verbose and (len(unmanaged_file) or len(unmanaged_path)): - print 'Removing unmanaged paths:\n {}'.format( - '\n '.join(sorted(unmanaged_file.union(unmanaged_path)))) - for f in unmanaged_file: - os.remove(os.path.join(cache_root, f.lstrip(os.path.sep))) - for p in reversed(sorted(unmanaged_path)): - os.rmdir(os.path.join(cache_root, p.lstrip(os.path.sep))) - - # Sort the list from most specific to least specific. - dsts = sorted(fs_config_dirs.keys(), key=lambda k: k.count(os.path.sep), - reverse=True) - sorted_dirs = list() - for dst in dsts: - sorted_dirs.append(fs_config_dirs[dst]) - - # The sysroot was nestled one deep at root/, so meta files - # can be stored in the parent without a risk of conflict. - if not os.path.isdir(os.path.join(cache_dir, 'etc')): - os.makedirs(os.path.join(cache_dir, 'etc')) - with open(os.path.join(cache_dir, 'etc', 'fs_config_files'), 'w') as f: - f.write(''.join(fs_config_files.values())) - with open(os.path.join(cache_dir, 'etc', 'fs_config_dirs'), 'w') as f: - f.write(''.join(sorted_dirs)) - with open(os.path.join(cache_dir, 'file_contexts'), 'w') as f: - f.write('\n'.join(file_context)) - - return uncached + """Copies all files specified in the target to a single cache dir. + + This call will create a submap from the parent spec and then use the + resulting copy destinations to populate the supplied path as if it were + the root of the destination. + + In the future, non-os and non-board packs will likely end up in their + own image (not /system), so this will create that cache ignoring any + Copy() nodes owned by a Pack that is prefixed by the target os or board. + + Args: + spec: project.ProjectSpec global specification + target: project.target.Target to cache + cache_dir: path to place the root/ of copied files. + mountpoint: optional path prefix to exclusively cache. + update: optionally, only replaces files that have changed. + verbose: (optional). If True, print information about what's + happening. Default True. + + Returns: + dict of uncached { dst_path => copy } + + Raises: + project.dependency.Error: If there is an unfulfilled dependency. + PathError: If a required path is missing. + """ + target_map = target.create_submap(spec.packmap) + + cache_root = os.path.join(cache_dir, 'root') + cache = sysroot.Sysroot(cache_root, copy_newer_only=update) + fs_config_files = {} + fs_config_dirs = {} + file_context = set() + uncached = {} + + # pack.Copy._reconcile_paths() *should* make these always what is seen here. + # TODO(wad): Add a <link> node. + allowed_copies = [ + (pack.CopyType.FILE, pack.CopyType.FILE, False), + (pack.CopyType.GLOB, pack.CopyType.DIR, False), + (pack.CopyType.GLOB, pack.CopyType.DIR, True), + (pack.CopyType.DIR, pack.CopyType.DIR, False), + (pack.CopyType.DIR, pack.CopyType.DIR, True), + ] + + final_files = {} + for destination, copy in target_map.copy_destinations.iteritems(): + copy = copy[0] # deduping has already occurred. + + # Currently, skip unknown mountpoints. + if not destination.startswith(mountpoint): + uncached[destination] = copy + continue + + # All destinations are required to be absolute. + # Make them relative so they play nicely with Sysroot(). + destination = destination.lstrip('/') + + # Proactively check allowed combinations so that the error + # handling isn't interleaved with logic. + if (copy.src_type, copy.dst_type, copy.recurse) not in allowed_copies: + # TODO(wad): Should these be FileABugError()s? + raise PathError( + '{}: impossible copy reached: {}'.format(copy.pack.origin, + copy)) + + # TODO(arihc): Will os packs ever draw in files from BSPs + # (other than build output files)? If so, need to link BSP to OS. + files = set() + try: + if copy.src_type == pack.CopyType.FILE: # FILE -> FILE + cache.Makedirs(os.path.dirname(destination)) + if os.path.islink(copy.src): + # TODO(wad): Resolve how to skip permission setting on + # symlinks. + cache.AddSymlink(copy.src, destination) + else: + cache.AddFile(copy.src, destination) + files.add(destination) + elif copy.src_type == pack.CopyType.GLOB: # GLOB -> DIR + files.update(cache.AddGlob(copy.src, destination, + recurse=copy.recurse)) + elif copy.src_type == pack.CopyType.DIR: # DIR -> DIR + files.update(cache.AddDir(copy.src, destination, + recurse=copy.recurse, symlinks=True)) + fs_config_dirs[destination] = copy.acl.fs_config(binary=True) + for f in files: + # Even though the fs_config_files format support globs, the + # final list of files is known at this point. + fs_config_files[f] = copy.acl.fs_config(path=f, binary=True) + if copy.acl.selabel: + file_context.update([copy.acl.file_context(path=f)]) + final_files[os.path.sep + f] = copy + + except IOError as e: + # Annotate the error with the line that where the <copy> is defined. + raise PathError('{}: {}'.format(copy.origin, e)) + + # Walk the cache tree and remove any unmanaged dirs. First + # we compute all allowed subpaths from the list of copied files, then + # walk the cache tree and remove any unknown files and directories. + final_dirs = set('/') + for dst, copy in final_files.iteritems(): + # Final files is always a full path so let's drop the file. + dst = os.path.dirname(dst) + while dst != os.path.sep: + # TODO(b/27848879) Add <dir> node set-acl support. + tmp_acl = pack.Copy(None, dst).acl + tmp_acl.perms = '0555' # Default to rx for dirs. + tmp_acl.override_build = copy.acl.override_build + if dst not in fs_config_dirs: + fs_config_dirs[dst] = '{}'.format( + tmp_acl.fs_config(binary=True)) + # The default root map covers file_context for now. + final_dirs.add(dst) + dst = os.path.dirname(dst) + + all_paths = set() + all_files = set() + # Walk the tree as quickly as possible with os.walk. + # Use set differencing to compute the unmanaged entries. + for root, _, files in os.walk(cache_root): + dst_path = root[len(cache_root):] or os.path.sep + # Don't touch unmanaged files outside of the given mountpoint. + if not dst_path.startswith(mountpoint): + continue + all_paths.add(dst_path) + all_files.update(set([os.path.join(dst_path, f) for f in files])) + unmanaged_file = all_files.difference(final_files.keys()) + unmanaged_path = all_paths.difference(final_dirs) + + if verbose and (len(unmanaged_file) or len(unmanaged_path)): + print 'Removing unmanaged paths:\n {}'.format( + '\n '.join(sorted(unmanaged_file.union(unmanaged_path)))) + for f in unmanaged_file: + os.remove(os.path.join(cache_root, f.lstrip(os.path.sep))) + for p in reversed(sorted(unmanaged_path)): + os.rmdir(os.path.join(cache_root, p.lstrip(os.path.sep))) + + # Sort the list from most specific to least specific. + dsts = sorted(fs_config_dirs.keys(), key=lambda k: k.count(os.path.sep), + reverse=True) + sorted_dirs = list() + for dst in dsts: + sorted_dirs.append(fs_config_dirs[dst]) + + # The sysroot was nestled one deep at root/, so meta files + # can be stored in the parent without a risk of conflict. + if not os.path.isdir(os.path.join(cache_dir, 'etc')): + os.makedirs(os.path.join(cache_dir, 'etc')) + with open(os.path.join(cache_dir, 'etc', 'fs_config_files'), 'w') as f: + f.write(''.join(fs_config_files.values())) + with open(os.path.join(cache_dir, 'etc', 'fs_config_dirs'), 'w') as f: + f.write(''.join(sorted_dirs)) + with open(os.path.join(cache_dir, 'file_contexts'), 'w') as f: + f.write('\n'.join(file_context)) + + return uncached def _CreateBuildProps(build_root, product_out, image_type, info_file): - if image_type == IMAGE_TYPE_ODM: - build_props = { - 'mount_point': 'odm', - 'fs_type': 'ext4', - # TODO(b/27854052): Get the size in here. - 'partition_size': '134217728', - 'extfs_sparse_flag': '-s', - 'skip_fsck': 'true', - 'selinux_fc': build_root.Path('root', 'file_contexts.bin'), - } - build_root.WriteFile(info_file, '\n'.join( - ['{}={}'.format(k, v) for k, v in build_props.iteritems()])) - elif image_type == IMAGE_TYPE_SYSTEM: - # Add and update a copy of system_image_info.txt - build_root.AddFile(os.path.join(product_out, - 'obj', - 'PACKAGING', - 'systemimage_intermediates', - 'system_image_info.txt'), info_file) - for line in fileinput.input(build_root.Path(info_file), inplace=True): - # We just want to change the SELinux file contexts. - if line.startswith('selinux_fc'): - line = 'selinux_fc=' + build_root.Path('root', 'file_contexts.bin') - print line.rstrip() - else: - build_root.WriteFile(info_file, 'mount_point={}'.format(image_type)) + if image_type == IMAGE_TYPE_ODM: + build_props = { + 'mount_point': 'odm', + 'fs_type': 'ext4', + # TODO(b/27854052): Get the size in here. + 'partition_size': '134217728', + 'extfs_sparse_flag': '-s', + 'skip_fsck': 'true', + 'selinux_fc': build_root.Path('root', 'file_contexts.bin'), + } + build_root.WriteFile(info_file, '\n'.join( + ['{}={}'.format(k, v) for k, v in build_props.iteritems()])) + elif image_type == IMAGE_TYPE_SYSTEM: + # Add and update a copy of system_image_info.txt + build_root.AddFile(os.path.join(product_out, + 'obj', + 'PACKAGING', + 'systemimage_intermediates', + 'system_image_info.txt'), info_file) + for line in fileinput.input(build_root.Path(info_file), inplace=True): + # We just want to change the SELinux file contexts. + if line.startswith('selinux_fc'): + line = 'selinux_fc=' + build_root.Path('root', + 'file_contexts.bin') + print line.rstrip() + else: + build_root.WriteFile(info_file, 'mount_point={}'.format(image_type)) def BuildImage(image_type, target, cache_dir, output_dir): - """Builds the image specified by image_type. - - Caller should validate target OS before calling. - - Args: - image_type: The type of the image. One of IMAGE_TYPES. - target: project.target.Target to build an image of. - cache_dir: Directory to build the image in. This path - may be prepared by prior callers. E.g., CreateTargetCache(). - output_dir: Directory to place built .img file in. - - Returns: - The image build exit code. - - Raises: - PathError: An expected directory cannot be found or isn't a dir. - util.HostUnsupportedArchError: The host is an unsupported architecture. - ImageTypeError: An invalid parameter value has been supplied for image_type. - """ - if image_type not in IMAGE_TYPES: - raise ImageTypeError('image_type must be one of {}: {}'.format( - IMAGE_TYPES, image_type)) - # Set some useful variables. - build_tools = util.GetOSPath(target.os_version, - 'build', 'tools', 'releasetools') - platform_out = target.platform_build_cache() - product_out = util.GetAndroidProductOut(platform_out, target.board) - host_arch = util.GetHostArch() - host_tools = os.path.join(platform_out, 'host', host_arch, 'bin') - output_file = os.path.join(output_dir, '{}.img'.format(image_type)) - ret = 1 - - # Check that all required inputs and tools exist. - _CheckBuildImagePaths(cache_dir, output_dir, product_out, host_tools, - build_tools) - # Check to ensure there are entries in root/image-mountpoint. - if not os.path.exists(os.path.join(cache_dir, 'root', image_type)): - print ('Warning: The specification does not define any destinations ' - 'for this image type: {}'.format(image_type)) - - build_root = sysroot.Sysroot(cache_dir, copy_newer_only=True) - - # Build 'filecontexts.bin' and 'sepolicy' SELinux files. - with target.get_device().linked(target.os_version): - policy.BuildSepolicy(target, platform_out, cache_dir) - policy.BuildFileContexts(target, platform_out, cache_dir) - - _CreateBuildProps(build_root, product_out, image_type, 'image_info.txt') - - # Build an image from the build root. - additional_path = host_tools + os.pathsep + build_tools - if 'PATH' in os.environ: - os.environ['PATH'] += os.pathsep + additional_path - else: - os.environ['PATH'] = additional_path - - # Repoint product out so that changes made locally are reflected - # for the build tooling. This primarily enables the discovery - # of etc/fs_config_{files, dirs}. - product_out = build_root.Path() - ret = subprocess.call(['build_image.py', - build_root.Path('root', image_type), - build_root.Path('image_info.txt'), - output_file, - product_out]) - return ret + """Builds the image specified by image_type. + + Caller should validate target OS before calling. + + Args: + image_type: The type of the image. One of IMAGE_TYPES. + target: project.target.Target to build an image of. + cache_dir: Directory to build the image in. This path + may be prepared by prior callers. E.g., CreateTargetCache(). + output_dir: Directory to place built .img file in. + + Returns: + The image build exit code. + + Raises: + PathError: An expected directory cannot be found or isn't a dir. + util.HostUnsupportedArchError: The host is an unsupported architecture. + ImageTypeError: An invalid parameter value has been supplied for + image_type. + """ + if image_type not in IMAGE_TYPES: + raise ImageTypeError('image_type must be one of {}: {}'.format( + IMAGE_TYPES, image_type)) + # Set some useful variables. + build_tools = util.GetOSPath(target.os_version, + 'build', 'tools', 'releasetools') + platform_out = target.platform_build_cache() + product_out = util.GetAndroidProductOut(platform_out, target.board) + host_arch = util.GetHostArch() + host_tools = os.path.join(platform_out, 'host', host_arch, 'bin') + output_file = os.path.join(output_dir, '{}.img'.format(image_type)) + ret = 1 + + # Check that all required inputs and tools exist. + _CheckBuildImagePaths(cache_dir, output_dir, product_out, host_tools, + build_tools) + # Check to ensure there are entries in root/image-mountpoint. + if not os.path.exists(os.path.join(cache_dir, 'root', image_type)): + print ('Warning: The specification does not define any destinations ' + 'for this image type: {}'.format(image_type)) + + build_root = sysroot.Sysroot(cache_dir, copy_newer_only=True) + + # Build 'filecontexts.bin' and 'sepolicy' SELinux files. + with target.get_device().linked(target.os_version): + policy.BuildSepolicy(target, platform_out, cache_dir) + policy.BuildFileContexts(target, platform_out, cache_dir) + + _CreateBuildProps(build_root, product_out, image_type, 'image_info.txt') + + # Build an image from the build root. + additional_path = host_tools + os.pathsep + build_tools + if 'PATH' in os.environ: + os.environ['PATH'] += os.pathsep + additional_path + else: + os.environ['PATH'] = additional_path + + # Repoint product out so that changes made locally are reflected + # for the build tooling. This primarily enables the discovery + # of etc/fs_config_{files, dirs}. + product_out = build_root.Path() + ret = subprocess.call(['build_image.py', + build_root.Path('root', image_type), + build_root.Path('image_info.txt'), + output_file, + product_out]) + return ret diff --git a/cli/lib/core/image_build_unittest.py b/cli/lib/core/image_build_unittest.py index 3803845..d6b7782 100644 --- a/cli/lib/core/image_build_unittest.py +++ b/cli/lib/core/image_build_unittest.py @@ -35,477 +35,483 @@ from selinux import policy_stub class TestData(object): - _BSP = 'test_board' - _BSP_VERSION = '0.1' - _BUILD_TYPE = 'user' - _BRILLO_VERSION = '9.9' - _CACHE_DIR = '/product/artifacts' - _PLATFORM_DIR = '/foo/bar' - _IMAGE_OUT_DIR = '/baz/pop' + _BSP = 'test_board' + _BSP_VERSION = '0.1' + _BUILD_TYPE = 'user' + _BRILLO_VERSION = '9.9' + _CACHE_DIR = '/product/artifacts' + _PLATFORM_DIR = '/foo/bar' + _IMAGE_OUT_DIR = '/baz/pop' class BuildImageBase(TestData): - """Base image unit test setup. - - Separate the setup from the default tests, so that BuildUnsupportedImageTest() - can instantiate this object in its test. - """ - - def setUp(self): - self.stub_os = stubs.StubOs() - - self.stub_fileinput = stubs.StubFileinput() - self.stub_open = stubs.StubOpen(self.stub_os) - self.stub_selinux = policy_stub.StubPolicy() - self.stub_subprocess = stubs.StubSubprocess() - self.stub_sysroot_generator = sysroot_stub.StubSysrootGenerator() - self.stub_util = util_stub.StubUtil(os_version=self._BRILLO_VERSION) - - image_build.fileinput = self.stub_fileinput - image_build.os = self.stub_os - image_build.open = self.stub_open.open - image_build.policy = self.stub_selinux - image_build.subprocess = self.stub_subprocess - image_build.sysroot = self.stub_sysroot_generator - image_build.util = self.stub_util - - self.device = device_stub.StubDevice( - should_link_version=self._BRILLO_VERSION) - self.target = target_stub.StubTarget( - board=self._BSP, os_version=self._BRILLO_VERSION, device=self.device) - self.target.platform_dir = self._PLATFORM_DIR - - self.product_out_dir = None - self.image_type = None - - def SetupForImageImage_Build(self, image_type='subclassme'): - """Helper: establishes pre-reqs for successful image image_build. - - Returns the expected subprocess call. + """Base image unit test setup. + + Separate the setup from the default tests, so that + BuildUnsupportedImageTest() can instantiate this object in its test. """ - if image_type is 'subclassme': - raise unittest.SkipTest('not in subclass') - self.image_type = image_type - - # Must be linux - self.stub_os.SetUname(('linux', '', '', '', '')) - - # Must have copy dir, product out dir, image out dir, - # host tools, and image_build tools. - platform_dir = self.target.platform_build_cache() - product_out_dir = self.stub_util.GetAndroidProductOut( - platform_dir, self.target.board) - self.product_out_dir = product_out_dir - self.stub_os.path.should_be_dir = [ - product_out_dir, self._IMAGE_OUT_DIR, self._CACHE_DIR, - self.target.platform_build_cache('host', - self.stub_util.GetHostArch(), 'bin'), - self.stub_util.GetOSPath(self._BRILLO_VERSION, - 'build', 'tools', 'releasetools')] - self.stub_os.path.should_exist += copy.deepcopy( - self.stub_os.path.should_be_dir) - - # Willing to make an image_build root in the cache dir. - self.stub_os.should_makedirs = [self._CACHE_DIR] - # Image_Build root should get system_image_info.txt, and modify it. - self.stub_fileinput.should_touch = [self.stub_os.path.join( - self._CACHE_DIR, 'image_info.txt')] - - expected_call = [ - 'build_image.py', - self.stub_os.path.join(self._CACHE_DIR, 'root', image_type), - self.stub_os.path.join(self._CACHE_DIR, 'image_info.txt'), - self.stub_os.path.join( - self._IMAGE_OUT_DIR, '{}.img'.format(image_type)), - self._CACHE_DIR] - - return expected_call + + def setUp(self): + self.stub_os = stubs.StubOs() + + self.stub_fileinput = stubs.StubFileinput() + self.stub_open = stubs.StubOpen(self.stub_os) + self.stub_selinux = policy_stub.StubPolicy() + self.stub_subprocess = stubs.StubSubprocess() + self.stub_sysroot_generator = sysroot_stub.StubSysrootGenerator() + self.stub_util = util_stub.StubUtil(os_version=self._BRILLO_VERSION) + + image_build.fileinput = self.stub_fileinput + image_build.os = self.stub_os + image_build.open = self.stub_open.open + image_build.policy = self.stub_selinux + image_build.subprocess = self.stub_subprocess + image_build.sysroot = self.stub_sysroot_generator + image_build.util = self.stub_util + + self.device = device_stub.StubDevice( + should_link_version=self._BRILLO_VERSION) + self.target = target_stub.StubTarget( + board=self._BSP, os_version=self._BRILLO_VERSION, + device=self.device) + self.target.platform_dir = self._PLATFORM_DIR + + self.product_out_dir = None + self.image_type = None + + def SetupForImageImage_Build(self, image_type='subclassme'): + """Helper: establishes pre-reqs for successful image image_build. + + Returns the expected subprocess call. + """ + if image_type is 'subclassme': + raise unittest.SkipTest('not in subclass') + self.image_type = image_type + + # Must be linux + self.stub_os.SetUname(('linux', '', '', '', '')) + + # Must have copy dir, product out dir, image out dir, + # host tools, and image_build tools. + platform_dir = self.target.platform_build_cache() + product_out_dir = self.stub_util.GetAndroidProductOut( + platform_dir, self.target.board) + self.product_out_dir = product_out_dir + self.stub_os.path.should_be_dir = [ + product_out_dir, self._IMAGE_OUT_DIR, self._CACHE_DIR, + self.target.platform_build_cache('host', + self.stub_util.GetHostArch(), + 'bin'), + self.stub_util.GetOSPath(self._BRILLO_VERSION, + 'build', 'tools', 'releasetools')] + self.stub_os.path.should_exist += copy.deepcopy( + self.stub_os.path.should_be_dir) + + # Willing to make an image_build root in the cache dir. + self.stub_os.should_makedirs = [self._CACHE_DIR] + # Image_Build root should get system_image_info.txt, and modify it. + self.stub_fileinput.should_touch = [self.stub_os.path.join( + self._CACHE_DIR, 'image_info.txt')] + + expected_call = [ + 'build_image.py', + self.stub_os.path.join(self._CACHE_DIR, 'root', image_type), + self.stub_os.path.join(self._CACHE_DIR, 'image_info.txt'), + self.stub_os.path.join( + self._IMAGE_OUT_DIR, '{}.img'.format(image_type)), + self._CACHE_DIR] + + return expected_call class BaseTests(object): - """Container for the base image tests. - - Put the base test class within a subclass. Otherwise the unit test - discover function picks them up as actual tests but marks them as skipped. - """ - - class BuildImageBaseTest(BuildImageBase, unittest.TestCase): - """Base set of tests to run for each image type.""" - - def test_image_success(self): - expected_call = self.SetupForImageImage_Build() - command = self.stub_subprocess.AddCommand() - self.assertEqual( - 0, - image_build.BuildImage(self.image_type, self.target, - self._CACHE_DIR, - self._IMAGE_OUT_DIR)) - - command.AssertCallWas(expected_call) - - def test_image_return_exit_code(self): - """Tests that the make exit code is returned.""" - self.stub_subprocess.AddCommand(ret_code=1) - self.SetupForImageImage_Build() - self.assertEqual( - 1, - image_build.BuildImage(self.image_type, self.target, - self._CACHE_DIR, - self._IMAGE_OUT_DIR)) - - def test_image_missing_paths(self): - self.SetupForImageImage_Build() - original_dirs = copy.deepcopy(self.stub_os.path.should_be_dir) - for directory in original_dirs: - self.stub_subprocess.AddCommand() - # Take a dir out. - self.stub_os.path.should_be_dir.remove(directory) - self.stub_os.path.should_exist.remove(directory) - # Check that we fail on all except the output dir. - if directory != self._IMAGE_OUT_DIR: - with self.assertRaises(image_build.PathError): - image_build.BuildImage( - self.image_type, self.target, - self._CACHE_DIR, self._IMAGE_OUT_DIR) - else: - self.stub_os.should_makedirs.append(self._IMAGE_OUT_DIR) - self.assertEqual( - 0, - image_build.BuildImage(self.image_type, self.target, - self._CACHE_DIR, - self._IMAGE_OUT_DIR)) - # Put it back in. - self.stub_os.path.should_be_dir.append(directory) - - def test_image_bad_host(self): - self.SetupForImageImage_Build() - self.stub_util.arch_is_supported = False - self.stub_subprocess.AddCommand() - # Shouldn't reach the linking point. - self.device.should_link_version = None - with self.assertRaises(self.stub_util.HostUnsupportedArchError): - image_build.BuildImage(self.image_type, self.target, - self._CACHE_DIR, self._IMAGE_OUT_DIR) + """Container for the base image tests. + + Put the base test class within a subclass. Otherwise the unit test + discover function picks them up as actual tests but marks them as skipped. + """ + + class BuildImageBaseTest(BuildImageBase, unittest.TestCase): + """Base set of tests to run for each image type.""" + + def test_image_success(self): + expected_call = self.SetupForImageImage_Build() + command = self.stub_subprocess.AddCommand() + self.assertEqual( + 0, + image_build.BuildImage(self.image_type, self.target, + self._CACHE_DIR, self._IMAGE_OUT_DIR)) + + command.AssertCallWas(expected_call) + + def test_image_return_exit_code(self): + """Tests that the make exit code is returned.""" + self.stub_subprocess.AddCommand(ret_code=1) + self.SetupForImageImage_Build() + self.assertEqual( + 1, + image_build.BuildImage(self.image_type, self.target, + self._CACHE_DIR, self._IMAGE_OUT_DIR)) + + def test_image_missing_paths(self): + self.SetupForImageImage_Build() + original_dirs = copy.deepcopy(self.stub_os.path.should_be_dir) + for directory in original_dirs: + self.stub_subprocess.AddCommand() + # Take a dir out. + self.stub_os.path.should_be_dir.remove(directory) + self.stub_os.path.should_exist.remove(directory) + # Check that we fail on all except the output dir. + if directory != self._IMAGE_OUT_DIR: + with self.assertRaises(image_build.PathError): + image_build.BuildImage( + self.image_type, self.target, + self._CACHE_DIR, self._IMAGE_OUT_DIR) + else: + self.stub_os.should_makedirs.append(self._IMAGE_OUT_DIR) + self.assertEqual( + 0, + image_build.BuildImage(self.image_type, self.target, + self._CACHE_DIR, + self._IMAGE_OUT_DIR)) + # Put it back in. + self.stub_os.path.should_be_dir.append(directory) + + def test_image_bad_host(self): + self.SetupForImageImage_Build() + self.stub_util.arch_is_supported = False + self.stub_subprocess.AddCommand() + # Shouldn't reach the linking point. + self.device.should_link_version = None + with self.assertRaises(self.stub_util.HostUnsupportedArchError): + image_build.BuildImage(self.image_type, self.target, + self._CACHE_DIR, self._IMAGE_OUT_DIR) class BuildUnsupportedImageTest(unittest.TestCase): - def test_unsupported_type(self): - bt = BuildImageBase() - bt.setUp() - bt.SetupForImageImage_Build('unknown') - with self.assertRaises(image_build.ImageTypeError): - # pylint: disable=protected-access - image_build.BuildImage(bt.image_type, bt.target, - bt._CACHE_DIR, bt._IMAGE_OUT_DIR) + def test_unsupported_type(self): + bt = BuildImageBase() + bt.setUp() + bt.SetupForImageImage_Build('unknown') + with self.assertRaises(image_build.ImageTypeError): + # pylint: disable=protected-access + image_build.BuildImage(bt.image_type, bt.target, + bt._CACHE_DIR, bt._IMAGE_OUT_DIR) class BuildOdmImageTest(BaseTests.BuildImageBaseTest): - def SetupForImageImage_Build(self, image_type='odm'): - """Helper: establishes pre-reqs for successful image image_build. + def SetupForImageImage_Build(self, image_type='odm'): + """Helper: establishes pre-reqs for successful image image_build. - Returns the expected subprocess call. - """ - call = super(BuildOdmImageTest, self).SetupForImageImage_Build('odm') - self.stub_sysroot_generator.should_write += ['image_info.txt'] - return call + Returns the expected subprocess call. + """ + call = super(BuildOdmImageTest, self).SetupForImageImage_Build('odm') + self.stub_sysroot_generator.should_write += ['image_info.txt'] + return call class BuildSystemImageTest(BaseTests.BuildImageBaseTest): - def SetupForImageImage_Build(self, image_type='system'): - """Helper: establishes pre-reqs for successful image image_build. - - Returns the expected subprocess call. - """ - expected_call = super(BuildSystemImageTest, self).SetupForImageImage_Build( - 'system') - # Make sure the built system is copied to system. - self.stub_sysroot_generator.should_add_dir += [ - (self.stub_os.path.join(self.product_out_dir, 'system'), - 'root/system', True) - ] - # Image_Build root should get system_image_info.txt, and modify it. - self.stub_sysroot_generator.should_add_file = [ - (self.stub_os.path.join(self.product_out_dir, 'obj', 'PACKAGING', - 'systemimage_intermediates', - 'system_image_info.txt'), 'image_info.txt')] - return expected_call + def SetupForImageImage_Build(self, image_type='system'): + """Helper: establishes pre-reqs for successful image image_build. + + Returns the expected subprocess call. + """ + expected_call = super(BuildSystemImageTest, + self).SetupForImageImage_Build('system') + # Make sure the built system is copied to system. + self.stub_sysroot_generator.should_add_dir += [ + (self.stub_os.path.join(self.product_out_dir, 'system'), + 'root/system', True) + ] + # Image_Build root should get system_image_info.txt, and modify it. + self.stub_sysroot_generator.should_add_file = [ + (self.stub_os.path.join(self.product_out_dir, 'obj', + 'PACKAGING', + 'systemimage_intermediates', + 'system_image_info.txt'), + 'image_info.txt')] + return expected_call class CreateTargetCacheTest(TestData, unittest.TestCase): - def setUp(self): - self.spec = project_spec_stub.StubProjectSpec() - self.target = None - self.cache_dir = '/base/path' - - p = pack.Pack('my_project', 'main') - self.copy = pack.Copy(p) - self.copy.src = '/some/source.txt' - self.copy.src_type = pack.CopyType.FILE - - self.stub_sysroot_generator = sysroot_stub.StubSysrootGenerator() - self.stub_os = stubs.StubOs() - self.stub_open = stubs.StubOpen(self.stub_os) - self.stub_glob = stubs.StubGlob(self.stub_os) - - image_build.glob = self.stub_glob - image_build.os = self.stub_os - image_build.open = self.stub_open.open - image_build.sysroot = self.stub_sysroot_generator - - self.metadata_files = [ - self.stub_os.path.join(self.cache_dir, 'etc'), - self.cache_dir, - self.stub_os.path.sep + self.cache_dir.split(self.stub_os.path.sep)[1], - self.stub_os.path.join(self.cache_dir, 'etc', 'fs_config_files'), - self.stub_os.path.join(self.cache_dir, 'etc', 'fs_config_dirs'), - self.stub_os.path.join(self.cache_dir, 'file_contexts'), - ] - # This always happens unless it fails early. - self.stub_os.should_makedirs += [ - self.stub_os.path.join(self.cache_dir, 'etc')] - - def tearDown(self): - # Make sure all files are copied. - for gen in self.stub_sysroot_generator.sysroots: - self.assertEqual(gen.should_add_file, []) - self.assertEqual(gen.should_add_dir, []) - self.assertEqual(gen.should_add_glob, []) - - def test_dependency_error(self): - target = target_stub.StubTarget(board=self._BSP, - submap_raises=dependency.Error('dep error')) - with self.assertRaises(dependency.Error): - image_build.CreateTargetCache(self.spec, target, self.cache_dir) - - def test_skip_os(self): - p = pack.Pack('brillo.{}'.format(self._BRILLO_VERSION), 'some_os_stuff') - simple_map = packmap_stub.StubPackMap( - destinations={'/system/bin/servicemanager': [pack.Copy(p)]}) - target = target_stub.StubTarget( - os='brillo', os_version=self._BRILLO_VERSION, board=self._BSP, - submaps=[simple_map]) - # sysroot stub will not be touched because the OS will be under /system. - image_build.CreateTargetCache(self.spec, target, self.cache_dir, - mountpoint='/odm') - - def test_with_os(self): - p = pack.Pack('brillo.{}'.format(self._BRILLO_VERSION), 'some_os_stuff') - cpy = pack.Copy(p, - dst='/system/bin/servicemanager', dst_type=pack.CopyType.FILE, - src='/tmp/a_file', src_type=pack.CopyType.FILE) - cpy.override_build = False - simple_map = packmap_stub.StubPackMap( - destinations={cpy.dst: [cpy]}) - target = target_stub.StubTarget( - os='brillo', os_version=self._BRILLO_VERSION, board=self._BSP, - submaps=[simple_map]) - self.stub_sysroot_generator.should_makedirs = [ - self.stub_os.path.dirname(cpy.dst)[1:]] - self.stub_sysroot_generator.should_add_file = [(cpy.src, - cpy.dst.lstrip('/'))] - image_build.CreateTargetCache(self.spec, target, self.cache_dir, - mountpoint='/') - - def test_skip_nonmountpoint(self): - self.copy.dst = '/system/bin/xyzzy.txt' - self.copy.dst_type = pack.CopyType.FILE - odm_copy = pack.Copy(self.copy.pack) - odm_copy.dst = '/odm/bin/xyzzy.txt' - odm_copy.dst_type = pack.CopyType.FILE - odm_copy.src = self.copy.src - odm_copy.src_type = self.copy.src_type - simple_map = packmap_stub.StubPackMap( - destinations={self.copy.dst: [self.copy], odm_copy.dst: [odm_copy]}) - target = target_stub.StubTarget(submaps=[simple_map]) - self.stub_sysroot_generator.should_makedirs = [ - self.stub_os.path.dirname(odm_copy.dst)[1:]] - self.stub_sysroot_generator.should_add_file = [(odm_copy.src, - 'odm/bin/xyzzy.txt')] - image_build.CreateTargetCache(self.spec, target, self.cache_dir, - mountpoint='/odm') - - def test_copy_file_src_file_dst(self): - self.copy.dst = '/system/bin/xyzzy.txt' - self.copy.dst_type = pack.CopyType.FILE - self.copy.acl.selabel = 'u:object_r:system_foo:s0' - simple_map = packmap_stub.StubPackMap( - destinations={self.copy.dst: [self.copy]}) - target = target_stub.StubTarget(submaps=[simple_map]) - self.stub_sysroot_generator.should_makedirs = [ - self.stub_os.path.dirname(self.copy.dst)[1:]] - self.stub_sysroot_generator.should_add_file = [(self.copy.src, - 'system/bin/xyzzy.txt')] - image_build.CreateTargetCache(self.spec, target, self.cache_dir) - # Ensure the selabel was copied. - file_contexts = self.stub_open.files[ - self.stub_os.path.join(self.cache_dir, 'file_contexts')].contents - self.assertRegexpMatches('\n'.join(file_contexts), - '{}($|\n)'.format(self.copy.acl.file_context())) - # Ensure the ACLs were copied too. - fs_config_files = self.stub_open.files[ - self.stub_os.path.join(self.cache_dir, 'etc', - 'fs_config_files')].contents - self.assertRegexpMatches( - '\n'.join(fs_config_files), - '{}($|\n)'.format(self.copy.acl.fs_config(binary=True))) - - def test_copy_system_with_custom_deep_file(self): - # When copying a "system" pack, we rely on the compiled in fs_config_* - # so let's make sure we don't override it. - - # A custom file. - self.copy.dst = '/system/bin/xyzzy.txt' - self.copy.dst_type = pack.CopyType.FILE - self.copy.acl.user = 1234 - self.copy.acl.group = 4321 - - p = pack.Pack('brillo.{}'.format(self._BRILLO_VERSION), 'some_os_stuff') - cpy = pack.Copy(p, - dst='/system/bin/servicemanager', dst_type=pack.CopyType.FILE, - src='/tmp/a_file', src_type=pack.CopyType.FILE) - cpy.acl.override_build = False - simple_map = packmap_stub.StubPackMap( - destinations={cpy.dst: [cpy], - self.copy.dst: [self.copy]}) - target = target_stub.StubTarget( - os='brillo', os_version=self._BRILLO_VERSION, board=self._BSP, - submaps=[simple_map]) - self.stub_sysroot_generator.should_makedirs = [ - self.stub_os.path.dirname(self.copy.dst)[1:], - self.stub_os.path.dirname(cpy.dst)[1:]] - self.stub_sysroot_generator.should_add_file = [(cpy.src, - cpy.dst.lstrip('/')), - (self.copy.src, - 'system/bin/xyzzy.txt')] - - image_build.CreateTargetCache(self.spec, target, self.cache_dir, - mountpoint='/') - - # Ensure the ACLs were copied. - fs_config_files = self.stub_open.files[ - self.stub_os.path.join(self.cache_dir, 'etc', - 'fs_config_files')].contents - self.assertEqual( - ['%\x00\x00\x01\xd2\x04\xe1\x10\x00\x00\x00\x00\x00\x00\x00\x00system/bin/xyzzy.txt\x00'], - fs_config_files) - # Ensure that no directory ACLs were implicitly placed on - # system or system/bin/. - fs_config_dirs = self.stub_open.files[ - self.stub_os.path.join(self.cache_dir, 'etc', - 'fs_config_dirs')].contents - self.assertEqual(fs_config_dirs, ['']) - - def test_copy_glob(self): - self.copy.src = '/my/bins/*' - self.copy.src_type = pack.CopyType.GLOB - self.copy.recurse = False - self.copy.dst = '/system/bin/' - self.copy.dst_type = pack.CopyType.DIR - simple_map = packmap_stub.StubPackMap( - destinations={self.copy.dst: [self.copy]}) - target = target_stub.StubTarget(submaps=[simple_map]) - self.stub_sysroot_generator.should_makedirs = [ - self.stub_os.path.dirname(self.copy.dst)[1:]] - self.stub_sysroot_generator.should_add_glob = [ - (self.copy.src, 'system/bin/', False)] - image_build.CreateTargetCache(self.spec, target, self.cache_dir) - - def test_copy_glob_recursive(self): - self.copy.src = '/my/bins/*' - self.copy.src_type = pack.CopyType.GLOB - self.copy.recurse = True - self.copy.dst = '/system/bin/' - self.copy.dst_type = pack.CopyType.DIR - simple_map = packmap_stub.StubPackMap( - destinations={self.copy.dst: [self.copy]}) - target = target_stub.StubTarget(submaps=[simple_map]) - self.stub_sysroot_generator.should_makedirs = [ - self.stub_os.path.dirname(self.copy.dst)[1:]] - self.stub_sysroot_generator.should_add_glob = [ - (self.copy.src, 'system/bin/', True)] - image_build.CreateTargetCache(self.spec, target, self.cache_dir) - - def test_copy_dir(self): - self.copy.src = '/my/etc/' - self.copy.src_type = pack.CopyType.DIR - self.copy.recurse = False - self.copy.dst = '/system/etc/' - self.copy.dst_type = pack.CopyType.DIR - simple_map = packmap_stub.StubPackMap( - destinations={self.copy.dst: [self.copy]}) - target = target_stub.StubTarget(submaps=[simple_map]) - self.stub_sysroot_generator.should_makedirs = [ - self.stub_os.path.dirname(self.copy.dst)[1:]] - self.stub_sysroot_generator.should_add_dir = [ - (self.copy.src, 'system/etc/', False)] - image_build.CreateTargetCache(self.spec, target, self.cache_dir) - - def test_copy_dir_recurse(self): - self.copy.src = '/my/etc/' - self.copy.src_type = pack.CopyType.DIR - self.copy.recurse = True - self.copy.dst = '/system/etc/' - self.copy.dst_type = pack.CopyType.DIR - simple_map = packmap_stub.StubPackMap( - destinations={self.copy.dst: [self.copy]}) - target = target_stub.StubTarget(submaps=[simple_map]) - self.stub_sysroot_generator.should_makedirs = [ - self.stub_os.path.dirname(self.copy.dst)[1:]] - self.stub_sysroot_generator.should_add_dir = [ - (self.copy.src, 'system/etc/', True)] - image_build.CreateTargetCache(self.spec, target, self.cache_dir) - - def test_copy_invalid_combos(self): - combos = [ - (pack.CopyType.DIR, pack.CopyType.FILE, False), - (pack.CopyType.DIR, pack.CopyType.FILE, True), - (pack.CopyType.FILE, pack.CopyType.GLOB, False), - (pack.CopyType.FILE, pack.CopyType.GLOB, True), - (pack.CopyType.GLOB, pack.CopyType.FILE, False), - (pack.CopyType.GLOB, pack.CopyType.FILE, True), - ] - for tup in combos: - self.copy.src_type = tup[0] - self.copy.dst_type = tup[1] - self.copy.dst = '/system/bin/xyzzy.txt' - simple_map = packmap_stub.StubPackMap( - destinations={self.copy.dst: [self.copy]}) - target = target_stub.StubTarget(submaps=[simple_map]) - with self.assertRaises(image_build.PathError): + def setUp(self): + self.spec = project_spec_stub.StubProjectSpec() + self.target = None + self.cache_dir = '/base/path' + + p = pack.Pack('my_project', 'main') + self.copy = pack.Copy(p) + self.copy.src = '/some/source.txt' + self.copy.src_type = pack.CopyType.FILE + + self.stub_sysroot_generator = sysroot_stub.StubSysrootGenerator() + self.stub_os = stubs.StubOs() + self.stub_open = stubs.StubOpen(self.stub_os) + self.stub_glob = stubs.StubGlob(self.stub_os) + + image_build.glob = self.stub_glob + image_build.os = self.stub_os + image_build.open = self.stub_open.open + image_build.sysroot = self.stub_sysroot_generator + + self.metadata_files = [ + self.stub_os.path.join(self.cache_dir, 'etc'), + self.cache_dir, + self.stub_os.path.sep + + self.cache_dir.split(self.stub_os.path.sep)[1], + self.stub_os.path.join(self.cache_dir, 'etc', 'fs_config_files'), + self.stub_os.path.join(self.cache_dir, 'etc', 'fs_config_dirs'), + self.stub_os.path.join(self.cache_dir, 'file_contexts'), + ] + # This always happens unless it fails early. + self.stub_os.should_makedirs += [ + self.stub_os.path.join(self.cache_dir, 'etc')] + + def tearDown(self): + # Make sure all files are copied. + for gen in self.stub_sysroot_generator.sysroots: + self.assertEqual(gen.should_add_file, []) + self.assertEqual(gen.should_add_dir, []) + self.assertEqual(gen.should_add_glob, []) + + def test_dependency_error(self): + target = target_stub.StubTarget( + board=self._BSP, submap_raises=dependency.Error('dep error')) + with self.assertRaises(dependency.Error): + image_build.CreateTargetCache(self.spec, target, self.cache_dir) + + def test_skip_os(self): + p = pack.Pack('brillo.{}'.format(self._BRILLO_VERSION), 'some_os_stuff') + simple_map = packmap_stub.StubPackMap( + destinations={'/system/bin/servicemanager': [pack.Copy(p)]}) + target = target_stub.StubTarget( + os='brillo', os_version=self._BRILLO_VERSION, board=self._BSP, + submaps=[simple_map]) + # sysroot stub will not be touched because the OS will be under /system. + image_build.CreateTargetCache(self.spec, target, self.cache_dir, + mountpoint='/odm') + + def test_with_os(self): + p = pack.Pack('brillo.{}'.format(self._BRILLO_VERSION), 'some_os_stuff') + cpy = pack.Copy(p, dst='/system/bin/servicemanager', + dst_type=pack.CopyType.FILE, src='/tmp/a_file', + src_type=pack.CopyType.FILE) + cpy.override_build = False + simple_map = packmap_stub.StubPackMap( + destinations={cpy.dst: [cpy]}) + target = target_stub.StubTarget( + os='brillo', os_version=self._BRILLO_VERSION, board=self._BSP, + submaps=[simple_map]) + self.stub_sysroot_generator.should_makedirs = [ + self.stub_os.path.dirname(cpy.dst)[1:]] + self.stub_sysroot_generator.should_add_file = [(cpy.src, + cpy.dst.lstrip('/'))] + image_build.CreateTargetCache(self.spec, target, self.cache_dir, + mountpoint='/') + + def test_skip_nonmountpoint(self): + self.copy.dst = '/system/bin/xyzzy.txt' + self.copy.dst_type = pack.CopyType.FILE + odm_copy = pack.Copy(self.copy.pack) + odm_copy.dst = '/odm/bin/xyzzy.txt' + odm_copy.dst_type = pack.CopyType.FILE + odm_copy.src = self.copy.src + odm_copy.src_type = self.copy.src_type + simple_map = packmap_stub.StubPackMap( + destinations={self.copy.dst: [self.copy], odm_copy.dst: [odm_copy]}) + target = target_stub.StubTarget(submaps=[simple_map]) + self.stub_sysroot_generator.should_makedirs = [ + self.stub_os.path.dirname(odm_copy.dst)[1:]] + self.stub_sysroot_generator.should_add_file = [(odm_copy.src, + 'odm/bin/xyzzy.txt')] + image_build.CreateTargetCache(self.spec, target, self.cache_dir, + mountpoint='/odm') + + def test_copy_file_src_file_dst(self): + self.copy.dst = '/system/bin/xyzzy.txt' + self.copy.dst_type = pack.CopyType.FILE + self.copy.acl.selabel = 'u:object_r:system_foo:s0' + simple_map = packmap_stub.StubPackMap( + destinations={self.copy.dst: [self.copy]}) + target = target_stub.StubTarget(submaps=[simple_map]) + self.stub_sysroot_generator.should_makedirs = [ + self.stub_os.path.dirname(self.copy.dst)[1:]] + self.stub_sysroot_generator.should_add_file = [(self.copy.src, + 'system/bin/xyzzy.txt')] + image_build.CreateTargetCache(self.spec, target, self.cache_dir) + # Ensure the selabel was copied. + file_contexts = self.stub_open.files[ + self.stub_os.path.join(self.cache_dir, 'file_contexts')].contents + self.assertRegexpMatches( + '\n'.join(file_contexts), + '{}($|\n)'.format(self.copy.acl.file_context())) + # Ensure the ACLs were copied too. + fs_config_files = self.stub_open.files[ + self.stub_os.path.join(self.cache_dir, 'etc', + 'fs_config_files')].contents + self.assertRegexpMatches( + '\n'.join(fs_config_files), + '{}($|\n)'.format(self.copy.acl.fs_config(binary=True))) + + def test_copy_system_with_custom_deep_file(self): + # When copying a "system" pack, we rely on the compiled in fs_config_* + # so let's make sure we don't override it. + + # A custom file. + self.copy.dst = '/system/bin/xyzzy.txt' + self.copy.dst_type = pack.CopyType.FILE + self.copy.acl.user = 1234 + self.copy.acl.group = 4321 + + p = pack.Pack('brillo.{}'.format(self._BRILLO_VERSION), 'some_os_stuff') + cpy = pack.Copy(p, dst='/system/bin/servicemanager', + dst_type=pack.CopyType.FILE, src='/tmp/a_file', + src_type=pack.CopyType.FILE) + cpy.acl.override_build = False + simple_map = packmap_stub.StubPackMap( + destinations={cpy.dst: [cpy], + self.copy.dst: [self.copy]}) + target = target_stub.StubTarget( + os='brillo', os_version=self._BRILLO_VERSION, board=self._BSP, + submaps=[simple_map]) + self.stub_sysroot_generator.should_makedirs = [ + self.stub_os.path.dirname(self.copy.dst)[1:], + self.stub_os.path.dirname(cpy.dst)[1:]] + self.stub_sysroot_generator.should_add_file = [(cpy.src, + cpy.dst.lstrip('/')), + (self.copy.src, + 'system/bin/xyzzy.txt')] + + image_build.CreateTargetCache(self.spec, target, self.cache_dir, + mountpoint='/') + + # Ensure the ACLs were copied. + fs_config_files = self.stub_open.files[ + self.stub_os.path.join(self.cache_dir, 'etc', + 'fs_config_files')].contents + self.assertEqual( + ['%\x00\x00\x01\xd2\x04\xe1\x10\x00\x00\x00\x00\x00\x00\x00\x00' + 'system/bin/xyzzy.txt\x00'], + fs_config_files) + # Ensure that no directory ACLs were implicitly placed on + # system or system/bin/. + fs_config_dirs = self.stub_open.files[ + self.stub_os.path.join(self.cache_dir, 'etc', + 'fs_config_dirs')].contents + self.assertEqual(fs_config_dirs, ['']) + + def test_copy_glob(self): + self.copy.src = '/my/bins/*' + self.copy.src_type = pack.CopyType.GLOB + self.copy.recurse = False + self.copy.dst = '/system/bin/' + self.copy.dst_type = pack.CopyType.DIR + simple_map = packmap_stub.StubPackMap( + destinations={self.copy.dst: [self.copy]}) + target = target_stub.StubTarget(submaps=[simple_map]) + self.stub_sysroot_generator.should_makedirs = [ + self.stub_os.path.dirname(self.copy.dst)[1:]] + self.stub_sysroot_generator.should_add_glob = [ + (self.copy.src, 'system/bin/', False)] + image_build.CreateTargetCache(self.spec, target, self.cache_dir) + + def test_copy_glob_recursive(self): + self.copy.src = '/my/bins/*' + self.copy.src_type = pack.CopyType.GLOB + self.copy.recurse = True + self.copy.dst = '/system/bin/' + self.copy.dst_type = pack.CopyType.DIR + simple_map = packmap_stub.StubPackMap( + destinations={self.copy.dst: [self.copy]}) + target = target_stub.StubTarget(submaps=[simple_map]) + self.stub_sysroot_generator.should_makedirs = [ + self.stub_os.path.dirname(self.copy.dst)[1:]] + self.stub_sysroot_generator.should_add_glob = [ + (self.copy.src, 'system/bin/', True)] + image_build.CreateTargetCache(self.spec, target, self.cache_dir) + + def test_copy_dir(self): + self.copy.src = '/my/etc/' + self.copy.src_type = pack.CopyType.DIR + self.copy.recurse = False + self.copy.dst = '/system/etc/' + self.copy.dst_type = pack.CopyType.DIR + simple_map = packmap_stub.StubPackMap( + destinations={self.copy.dst: [self.copy]}) + target = target_stub.StubTarget(submaps=[simple_map]) + self.stub_sysroot_generator.should_makedirs = [ + self.stub_os.path.dirname(self.copy.dst)[1:]] + self.stub_sysroot_generator.should_add_dir = [ + (self.copy.src, 'system/etc/', False)] + image_build.CreateTargetCache(self.spec, target, self.cache_dir) + + def test_copy_dir_recurse(self): + self.copy.src = '/my/etc/' + self.copy.src_type = pack.CopyType.DIR + self.copy.recurse = True + self.copy.dst = '/system/etc/' + self.copy.dst_type = pack.CopyType.DIR + simple_map = packmap_stub.StubPackMap( + destinations={self.copy.dst: [self.copy]}) + target = target_stub.StubTarget(submaps=[simple_map]) + self.stub_sysroot_generator.should_makedirs = [ + self.stub_os.path.dirname(self.copy.dst)[1:]] + self.stub_sysroot_generator.should_add_dir = [ + (self.copy.src, 'system/etc/', True)] image_build.CreateTargetCache(self.spec, target, self.cache_dir) - def test_unmanaged_file(self): - self.copy.dst = '/system/bin/xyzzy.txt' - self.copy.dst_type = pack.CopyType.FILE - simple_map = packmap_stub.StubPackMap( - destinations={self.copy.dst: [self.copy]}) - target = target_stub.StubTarget(submaps=[simple_map]) - self.stub_sysroot_generator.should_makedirs = [ - self.stub_os.path.dirname(self.copy.dst)[1:]] - self.stub_sysroot_generator.should_add_file = [(self.copy.src, - 'system/bin/xyzzy.txt')] - self.stub_os.path.should_exist.append('/base/path/root/system/foozle/blag') - image_build.CreateTargetCache(self.spec, target, self.cache_dir, - verbose=False) - # The metadata files should've been created. - expect_files = self.metadata_files - self.assertEqual(self.stub_os.path.should_exist, expect_files) - - def test_unmanaged_dir_dir_file(self): - self.copy.dst = '/system/bin/xyzzy.txt' - self.copy.dst_type = pack.CopyType.FILE - simple_map = packmap_stub.StubPackMap( - destinations={self.copy.dst: [self.copy]}) - target = target_stub.StubTarget(submaps=[simple_map]) - self.stub_sysroot_generator.should_makedirs = [ - self.stub_os.path.dirname(self.copy.dst)[1:]] - self.stub_sysroot_generator.should_add_file = [(self.copy.src, - 'system/bin/xyzzy.txt')] - self.stub_os.path.should_exist.append( - '/base/path/root/system/foozle/barzle/blag.mal') - image_build.CreateTargetCache(self.spec, target, self.cache_dir, - verbose=False) - expect_files = self.metadata_files - self.assertEqual(self.stub_os.path.should_exist, expect_files) + def test_copy_invalid_combos(self): + combos = [ + (pack.CopyType.DIR, pack.CopyType.FILE, False), + (pack.CopyType.DIR, pack.CopyType.FILE, True), + (pack.CopyType.FILE, pack.CopyType.GLOB, False), + (pack.CopyType.FILE, pack.CopyType.GLOB, True), + (pack.CopyType.GLOB, pack.CopyType.FILE, False), + (pack.CopyType.GLOB, pack.CopyType.FILE, True), + ] + for tup in combos: + self.copy.src_type = tup[0] + self.copy.dst_type = tup[1] + self.copy.dst = '/system/bin/xyzzy.txt' + simple_map = packmap_stub.StubPackMap( + destinations={self.copy.dst: [self.copy]}) + target = target_stub.StubTarget(submaps=[simple_map]) + with self.assertRaises(image_build.PathError): + image_build.CreateTargetCache(self.spec, target, self.cache_dir) + + def test_unmanaged_file(self): + self.copy.dst = '/system/bin/xyzzy.txt' + self.copy.dst_type = pack.CopyType.FILE + simple_map = packmap_stub.StubPackMap( + destinations={self.copy.dst: [self.copy]}) + target = target_stub.StubTarget(submaps=[simple_map]) + self.stub_sysroot_generator.should_makedirs = [ + self.stub_os.path.dirname(self.copy.dst)[1:]] + self.stub_sysroot_generator.should_add_file = [(self.copy.src, + 'system/bin/xyzzy.txt')] + self.stub_os.path.should_exist.append( + '/base/path/root/system/foozle/blag') + image_build.CreateTargetCache(self.spec, target, self.cache_dir, + verbose=False) + # The metadata files should've been created. + expect_files = self.metadata_files + self.assertEqual(self.stub_os.path.should_exist, expect_files) + + def test_unmanaged_dir_dir_file(self): + self.copy.dst = '/system/bin/xyzzy.txt' + self.copy.dst_type = pack.CopyType.FILE + simple_map = packmap_stub.StubPackMap( + destinations={self.copy.dst: [self.copy]}) + target = target_stub.StubTarget(submaps=[simple_map]) + self.stub_sysroot_generator.should_makedirs = [ + self.stub_os.path.dirname(self.copy.dst)[1:]] + self.stub_sysroot_generator.should_add_file = [(self.copy.src, + 'system/bin/xyzzy.txt')] + self.stub_os.path.should_exist.append( + '/base/path/root/system/foozle/barzle/blag.mal') + image_build.CreateTargetCache(self.spec, target, self.cache_dir, + verbose=False) + expect_files = self.metadata_files + self.assertEqual(self.stub_os.path.should_exist, expect_files) diff --git a/cli/lib/core/popen.py b/cli/lib/core/popen.py index 785e1a4..d71fecf 100644 --- a/cli/lib/core/popen.py +++ b/cli/lib/core/popen.py @@ -21,7 +21,7 @@ import subprocess def PopenPiped(args, **kwargs): - """A wrapper around Popen with stdout/stderr pipes.""" - return subprocess.Popen(args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, **kwargs) + """A wrapper around Popen with stdout/stderr pipes.""" + return subprocess.Popen(args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, **kwargs) diff --git a/cli/lib/core/product.py b/cli/lib/core/product.py index e50df49..7561d59 100644 --- a/cli/lib/core/product.py +++ b/cli/lib/core/product.py @@ -25,137 +25,142 @@ from core import util class ProductCreatorError(Exception): - pass + pass class ProductCreator(object): - def __init__(self, name, vendor, device): - """Initializes the ProductCreators internal properties""" - self._name = name - self._config = None - self._device = device - self._vendor = vendor - - def exists(self): - """Returns true if a subdirectory with the product name exists""" - try: - return os.path.exists(self._name) - except OSError: - return False - - def create(self): - """Creates a subdirectory for the product and creates templates""" - self._create_empty() - - # Populate the envsetup.sh file. - env = Environment(self._config, self._name) - env.envsetup() - - def _init_config(self): - # Populate the configuration store - self._config = config.ProductFileStore(os.path.abspath(self._name)) - self._config.name = self._name - self._config.brand = 'Brillo' - self._config.manufacturer = self._vendor - self._config.manufacturer = self._vendor - self._config.device = self._device - self._config.bdk.buildtype = 'userdebug' - self._config.bdk.version = util.GetBDKVersion() - self._config.copy_files = """ + def __init__(self, name, vendor, device): + """Initializes the ProductCreators internal properties""" + self._name = name + self._config = None + self._device = device + self._vendor = vendor + + def exists(self): + """Returns true if a subdirectory with the product name exists""" + try: + return os.path.exists(self._name) + except OSError: + return False + + def create(self): + """Creates a subdirectory for the product and creates templates""" + self._create_empty() + + # Populate the envsetup.sh file. + env = Environment(self._config, self._name) + env.envsetup() + + def _init_config(self): + # Populate the configuration store + self._config = config.ProductFileStore(os.path.abspath(self._name)) + self._config.name = self._name + self._config.brand = 'Brillo' + self._config.manufacturer = self._vendor + self._config.manufacturer = self._vendor + self._config.device = self._device + self._config.bdk.buildtype = 'userdebug' + self._config.bdk.version = util.GetBDKVersion() + self._config.copy_files = """ # Format: # path-in-product-dir:path-to-install-in-device # E.g., weaved.conf:system/etc/weaved/weaved.conf - """ - self._config.brillo.product_id = ( - 'developer-boards:brillo-starter-board-<REPLACE ME>') - self._config.brillo.crash_server = 'https://clients2.google.com/bc/report' - # Building without Java isn't completely supported yet. - # b/25281898 - self._config.bdk.java = '1' - - def _create_common(self): - paths = [self._name, os.path.join(self._name, 'out'), - os.path.join(self._name, 'sepolicy'), - os.path.join(self._name, 'src')] - for path in paths: - if not os.path.exists(path): - os.makedirs(path) - self._init_config() - - with open(os.path.join(self._name, 'AndroidProducts.mk'), 'w') as f: - f.write(product_templates.ANDROIDPRODUCTS_MK.substitute( - self._config.dict())) - - def _create_empty(self): - """Creates a subdirectory for the product and creates templates""" - self._create_common() - with open(os.path.join(self._name, ('%s.mk' % self._name)), 'w') as f: - f.write(product_templates.PRODUCT_MK.substitute(self._config.dict())) - - # SELinux templates. - sepolicy_dir = os.path.join(self._name, 'sepolicy') - service_te_file = os.path.join(sepolicy_dir, '%s_service.te' % self._name) - - with open(service_te_file, 'w') as f: - f.write(product_templates.SELINUX_DOMAIN.substitute(self._config.dict())) - - with open(os.path.join(sepolicy_dir, 'file_contexts'), 'w') as f: - f.write(product_templates.SELINUX_FILE_CONTEXTS.substitute( - self._config.dict())) + """ + self._config.brillo.product_id = ( + 'developer-boards:brillo-starter-board-<REPLACE ME>') + self._config.brillo.crash_server = ( + 'https://clients2.google.com/bc/report') + # Building without Java isn't completely supported yet. + # b/25281898 + self._config.bdk.java = '1' + + def _create_common(self): + paths = [self._name, os.path.join(self._name, 'out'), + os.path.join(self._name, 'sepolicy'), + os.path.join(self._name, 'src')] + for path in paths: + if not os.path.exists(path): + os.makedirs(path) + self._init_config() + + with open(os.path.join(self._name, 'AndroidProducts.mk'), 'w') as f: + f.write(product_templates.ANDROIDPRODUCTS_MK.substitute( + self._config.dict())) + + def _create_empty(self): + """Creates a subdirectory for the product and creates templates""" + self._create_common() + with open(os.path.join(self._name, ('%s.mk' % self._name)), 'w') as f: + f.write( + product_templates.PRODUCT_MK.substitute(self._config.dict())) + + # SELinux templates. + sepolicy_dir = os.path.join(self._name, 'sepolicy') + service_te_file = os.path.join(sepolicy_dir, + '%s_service.te' % self._name) + + with open(service_te_file, 'w') as f: + f.write( + product_templates.SELINUX_DOMAIN.substitute( + self._config.dict())) + + with open(os.path.join(sepolicy_dir, 'file_contexts'), 'w') as f: + f.write(product_templates.SELINUX_FILE_CONTEXTS.substitute( + self._config.dict())) class Environment(object): - """This class is used to generate a shell environment meant to streamline - the use of the BDK while keeping it familiar to Android device developers. - - Note: None of the templates other than ENVSETUP should contain single - quotes ('). If they do, it will break during sourcing of envsetup.sh. - """ - - def __init__(self, config_, product_dir=util.GetProductDir()): - self._config = config_ - self._product_dir = product_dir - - def banner(self): - """Return the banner to be printed after envsetup.sh is sourced""" - bdk_path = util.GetBDKPath() - bdk_version = util.GetBDKVersion() - # TODO(b/25952629) Add support for a BDK warning if an update is needed. - bdk_warning = '' - return product_templates.BDK_BANNER.substitute( - self._config.dict(), - bdk_version=bdk_version, - bdk_path=bdk_path, - bdk_warning=bdk_warning) - - def exports(self): - """Export shell environment variables for calling dev tools""" - bdk_path = util.DEPRECATED_GetDefaultOSPath() - return product_templates.ENV_EXPORTS.substitute( - self._config.dict(), - bdk_path=bdk_path, - product_path=self._product_dir, - cli_path=util.GetBDKPath('cli'), - target_out=os.path.join(self._product_dir, 'out', - 'out-%s' % (self._config.device), - 'target', 'product', self._config.device) - ) - - def aliases(self): - """Wrap bdk commands with shell functions""" - return product_templates.ENV_ALIASES.substitute( - self._config.dict(), product_path=self._product_dir) - - def environment(self): - """Returns a complete shell environment for product development""" - return self.banner() + self.exports() + self.aliases() - - def envsetup(self): - """Generates a file which will run bdk product envsetup""" - bdk_path = util.DEPRECATED_GetDefaultOSPath() - tools_path = util.GetBDKPath() - with open(os.path.join(self._product_dir, 'envsetup.sh'), 'w') as f: - f.write(product_templates.ENVSETUP.substitute( - self._config.dict(), bdk_path=bdk_path, - product_path=self._product_dir, - tools_path=tools_path)) + """This class is used to generate a shell environment meant to streamline + the use of the BDK while keeping it familiar to Android device developers. + + Note: None of the templates other than ENVSETUP should contain single + quotes ('). If they do, it will break during sourcing of envsetup.sh. + """ + + def __init__(self, config_, product_dir=util.GetProductDir()): + self._config = config_ + self._product_dir = product_dir + + def banner(self): + """Return the banner to be printed after envsetup.sh is sourced.""" + bdk_path = util.GetBDKPath() + bdk_version = util.GetBDKVersion() + # TODO(b/25952629) Add support for a BDK warning if an update is needed. + bdk_warning = '' + return product_templates.BDK_BANNER.substitute( + self._config.dict(), + bdk_version=bdk_version, + bdk_path=bdk_path, + bdk_warning=bdk_warning) + + def exports(self): + """Export shell environment variables for calling dev tools""" + bdk_path = util.DEPRECATED_GetDefaultOSPath() + return product_templates.ENV_EXPORTS.substitute( + self._config.dict(), + bdk_path=bdk_path, + product_path=self._product_dir, + cli_path=util.GetBDKPath('cli'), + target_out=os.path.join(self._product_dir, 'out', + 'out-%s' % (self._config.device), + 'target', 'product', self._config.device) + ) + + def aliases(self): + """Wrap bdk commands with shell functions""" + return product_templates.ENV_ALIASES.substitute( + self._config.dict(), product_path=self._product_dir) + + def environment(self): + """Returns a complete shell environment for product development""" + return self.banner() + self.exports() + self.aliases() + + def envsetup(self): + """Generates a file which will run bdk product envsetup""" + bdk_path = util.DEPRECATED_GetDefaultOSPath() + tools_path = util.GetBDKPath() + with open(os.path.join(self._product_dir, 'envsetup.sh'), 'w') as f: + f.write(product_templates.ENVSETUP.substitute( + self._config.dict(), bdk_path=bdk_path, + product_path=self._product_dir, + tools_path=tools_path)) diff --git a/cli/lib/core/properties.py b/cli/lib/core/properties.py index 2d14508..cc8793f 100644 --- a/cli/lib/core/properties.py +++ b/cli/lib/core/properties.py @@ -21,249 +21,253 @@ import string class PropEntry(object): - """Leaf node in the property tree representing the final value.""" - def __init__(self, name, full_key, legal_values, setter, getter): - self._name = name - self._full_key = full_key - self._legal_values = legal_values - self._getter = getter - self._setter = setter + """Leaf node in the property tree representing the final value.""" + def __init__(self, name, full_key, legal_values, setter, getter): + self._name = name + self._full_key = full_key + self._legal_values = legal_values + self._getter = getter + self._setter = setter - def name(self): - return self._name + def name(self): + return self._name - def get(self): - return self._getter(self._full_key) + def get(self): + return self._getter(self._full_key) - def set(self, val): - if self._legal_values and val not in self._legal_values: - raise ValueError('Cannot set {} to {} (legal values are {}).'.format( - self._full_key, val, self._legal_values)) - return self._setter(self._full_key, val) + def set(self, val): + if self._legal_values and val not in self._legal_values: + raise ValueError( + 'Cannot set {} to {} (legal values are {}).'.format( + self._full_key, val, self._legal_values)) + return self._setter(self._full_key, val) - def __repr__(self): - return 'PropEntry(%s)' % self._name + def __repr__(self): + return 'PropEntry(%s)' % self._name - def __str__(self): - return 'PropEntry(name=%s)' % self._name + def __str__(self): + return 'PropEntry(name=%s)' % self._name class PropGroup(object): - """Group of groups or entries used by PropBase""" - def __init__(self, name, parent): - self._name = name - self._parent = parent - self._children = [] - - def __setattr__(self, key, val): - if key.startswith('_'): - super(PropGroup, self).__setattr__(key, val) - else: - if self.has_child(key): - c = self.child(key) - if type(c) == PropEntry: - return c.set(val) - p = self._parent - prefix = self.name() - while type(p) == PropGroup: - if p.name() != 'root': - prefix = string.join([p.name(), prefix], '.') - p = p.parent() - raise AttributeError( - "AttributeError: '{}' object has no attribute '{}'".format( - type(p), string.join([prefix, key], '.'))) - - def __getattr__(self, key): - if key.startswith('_'): - return super(PropGroup, self).__getattr__(key) - entry = string.split(key, '/') - if self.has_child(entry[0]): - c = self.child(entry[0]) - if type(c) == PropEntry: - return c.get() - return c - p = self._parent - prefix = self._name - while type(p) == PropGroup: - if p.name() != 'root': - prefix = string.join([p.name(), prefix], '.') - p = p.parent() - raise AttributeError( - "AttributeError: '{}' object has no attribute '{}'".format( - type(p), string.join([prefix, key], '.'))) - - def name(self): - return self._name - - def parent(self): - return self._parent - - def children(self): - return self._children - - def add_child(self, child): - self._children.append(child) - - def child(self, child_name): - for c in self._children: - if c.name() == child_name: - return c - raise KeyError, child_name - - def has_child(self, child_name): - for c in self._children: - if c.name() == child_name: - return True - return False + """Group of groups or entries used by PropBase""" + def __init__(self, name, parent): + self._name = name + self._parent = parent + self._children = [] + + def __setattr__(self, key, val): + if key.startswith('_'): + super(PropGroup, self).__setattr__(key, val) + else: + if self.has_child(key): + c = self.child(key) + if type(c) == PropEntry: + return c.set(val) + p = self._parent + prefix = self.name() + while type(p) == PropGroup: + if p.name() != 'root': + prefix = string.join([p.name(), prefix], '.') + p = p.parent() + raise AttributeError( + "AttributeError: '{}' object has no attribute '{}'".format( + type(p), string.join([prefix, key], '.'))) + + def __getattr__(self, key): + if key.startswith('_'): + return super(PropGroup, self).__getattr__(key) + entry = string.split(key, '/') + if self.has_child(entry[0]): + c = self.child(entry[0]) + if type(c) == PropEntry: + return c.get() + return c + p = self._parent + prefix = self._name + while type(p) == PropGroup: + if p.name() != 'root': + prefix = string.join([p.name(), prefix], '.') + p = p.parent() + raise AttributeError( + "AttributeError: '{}' object has no attribute '{}'".format( + type(p), string.join([prefix, key], '.'))) + + def name(self): + return self._name + + def parent(self): + return self._parent + + def children(self): + return self._children + + def add_child(self, child): + self._children.append(child) + + def child(self, child_name): + for c in self._children: + if c.name() == child_name: + return c + raise KeyError, child_name + + def has_child(self, child_name): + for c in self._children: + if c.name() == child_name: + return True + return False - def __repr__(self): - parent = None - if type(self._parent) == PropGroup: - parent = self._parent.name() - return "{{ name: '{}', parent: '{}', children: {} }}".format( - self._name, parent, self._children) + def __repr__(self): + parent = None + if type(self._parent) == PropGroup: + parent = self._parent.name() + return "{{ name: '{}', parent: '{}', children: {} }}".format( + self._name, parent, self._children) - def __str__(self): - return 'PropGroup(name="%s")' % self._name + def __str__(self): + return 'PropGroup(name="%s")' % self._name # pylint: disable=abstract-class-not-used class PropBase(object): - """PropBase - makes a list of path-like properties into attr interface + """PropBase - makes a list of path-like properties into attr interface - This class provides a base class for creating arbitrary tree structures of - properties using a path-like definition. One can set DESIRED_PROPS and - REQUIRED_PROPS, as well as enable or disable CACHING. Only two methods need - to be overriden: _save, and _load. For instance, + This class provides a base class for creating arbitrary tree structures of + properties using a path-like definition. One can set DESIRED_PROPS and + REQUIRED_PROPS, as well as enable or disable CACHING. Only two methods need + to be overriden: _save, and _load. For instance, - class PropDict(PropBase): + class PropDict(PropBase): REQUIRED_PROPS = { 'editor/config/vim': [], 'editor/config/emacs': [], 'current_editor' : ['emacs', 'vim'] } CACHING = True - def __init__(self): - self._d = {} - super(PropDict, self).__init__() - - def _save(self, key, value): - self._d[key] = value - def _load(self, key): - if not self._d.has_key(key): - return '' - return self._d[key] - - Usage then follows: - p = PropDict() - p.editor.config.vim = '$HOME/.vimrc' - p.current_editor = 'vim' - print p.editor.config.emacs - - Note! No properties may start with '_'. - """ - - # Fields of the store that can be accessed as properties. - # { 'field_name': [], # Contains anything - # 'restricted_field' : ['1', '0'] } # Only '0' or '1' - REQUIRED_PROPS = {} - OPTIONAL_PROPS = {} - - CACHING = False - - def __init__(self): - self._root = self.create_prop_tree() - self._cache = {} - - def complete(self): - """Returns true if all expected properties are set.""" - for p in self.REQUIRED_PROPS.keys(): - # We use getattr instead of _load so that results get cached. - if getattr(self, p) is None: - return False - return True + def __init__(self): + self._d = {} + super(PropDict, self).__init__() + + def _save(self, key, value): + self._d[key] = value + def _load(self, key): + if not self._d.has_key(key): + return '' + return self._d[key] - @classmethod - def properties(cls): - """Returns a dict of valid item keys and lists of validation values. - Keys must be bare, e.g., 'foo', or a valid relative path, e.g., - 'foo/bar/baz'. Any intermediate values cannot contain values. + Usage then follows: + p = PropDict() + p.editor.config.vim = '$HOME/.vimrc' + p.current_editor = 'vim' + print p.editor.config.emacs + + Note! No properties may start with '_'. """ - d = cls.OPTIONAL_PROPS.copy() - d.update(cls.REQUIRED_PROPS) - return d - def _load(self, key): - """Returns the correct value for the given |key|.""" - raise NotImplementedError + # Fields of the store that can be accessed as properties. + # { 'field_name': [], # Contains anything + # 'restricted_field' : ['1', '0'] } # Only '0' or '1' + REQUIRED_PROPS = {} + OPTIONAL_PROPS = {} - def _save(self, key, val): - """Sets the backing for the |key| to the given |val|. + CACHING = False - Returns: - The saved value, in case any manipultions were made. - """ - raise NotImplementedError - - def __getattr__(self, key): - if key.startswith('_'): - return super(PropBase, self).__getattr__(key) - entry = string.split(key, '/') - if self._root.has_child(entry[0]): - c = self._root.child(entry[0]) - if type(c) == PropEntry: - result = None - # If not self.CACHING, self._cache is empty so this is false. - if c in self._cache: - result = self._cache[c] + def __init__(self): + self._root = self.create_prop_tree() + self._cache = {} + + def complete(self): + """Returns true if all expected properties are set.""" + for p in self.REQUIRED_PROPS.keys(): + # We use getattr instead of _load so that results get cached. + if getattr(self, p) is None: + return False + return True + + @classmethod + def properties(cls): + """Returns a dict of valid item keys and lists of validation values. + + Keys must be bare, e.g., 'foo', or a valid relative path, e.g., + 'foo/bar/baz'. Any intermediate values cannot contain values. + """ + d = cls.OPTIONAL_PROPS.copy() + d.update(cls.REQUIRED_PROPS) + return d + + def _load(self, key): + """Returns the correct value for the given |key|.""" + raise NotImplementedError + + def _save(self, key, val): + """Sets the backing for the |key| to the given |val|. + + Returns: + The saved value, in case any manipultions were made. + """ + raise NotImplementedError + + def __getattr__(self, key): + if key.startswith('_'): + return super(PropBase, self).__getattr__(key) + entry = string.split(key, '/') + if self._root.has_child(entry[0]): + c = self._root.child(entry[0]) + if type(c) == PropEntry: + result = None + # If not self.CACHING, self._cache is empty so this is false. + if c in self._cache: + result = self._cache[c] + else: + result = c.get() + # For space efficiency, only cache if desired. + if self.CACHING: + self._cache[c] = result + return result + return c + raise AttributeError( + "AttributeError: '%s' object has no attribute '%s'" % (type(self), + key)) + + def __setattr__(self, key, val): + if key.startswith('_'): + super(PropBase, self).__setattr__(key, val) else: - result = c.get() - # For space efficiency, only cache if desired. - if self.CACHING: - self._cache[c] = result - return result - return c - raise AttributeError, \ - "AttributeError: '%s' object has no attribute '%s'" % (type(self), key) - - def __setattr__(self, key, val): - if key.startswith('_'): - super(PropBase, self).__setattr__(key, val) - else: - if self._root.has_child(key): - c = self._root.child(key) - if type(c) == PropEntry: - result = self._cache.get(c) - if val != result: - result = c.set(val) - if self.CACHING: - self._cache[c] = result - return result - raise AttributeError, \ - "AttributeError: '%s' object has no attribute '%s'" % (type(self), key) - - def create_prop_tree(self): - """Creates a tree of PropGroups and PropEntrys from properties()""" - root = PropGroup('root', self) - for k, v in self.properties().iteritems(): - entry = string.split(k, '/') - entry.reverse() - component = root - prefix = '' - while len(entry) > 1: - name = entry.pop() - prefix = string.join([prefix, name], '.') - if not component.has_child(name): - component.add_child(PropGroup(name, component)) - component = component.child(name) - # leaf - component.add_child(PropEntry(entry[0], - k, v, - self._save, self._load)) - return root - - def dict(self): - """Returns a fully populated a dict""" - d = {} - for p in self.properties(): - d[p] = self._load(p) - return d + if self._root.has_child(key): + c = self._root.child(key) + if type(c) == PropEntry: + result = self._cache.get(c) + if val != result: + result = c.set(val) + if self.CACHING: + self._cache[c] = result + return result + raise AttributeError( + "AttributeError: '%s' object has no attribute '%s'" % ( + type(self), key)) + + def create_prop_tree(self): + """Creates a tree of PropGroups and PropEntrys from properties()""" + root = PropGroup('root', self) + for k, v in self.properties().iteritems(): + entry = string.split(k, '/') + entry.reverse() + component = root + prefix = '' + while len(entry) > 1: + name = entry.pop() + prefix = string.join([prefix, name], '.') + if not component.has_child(name): + component.add_child(PropGroup(name, component)) + component = component.child(name) + # leaf + component.add_child(PropEntry(entry[0], + k, v, + self._save, self._load)) + return root + + def dict(self): + """Returns a fully populated a dict""" + d = {} + for p in self.properties(): + d[p] = self._load(p) + return d diff --git a/cli/lib/core/provision.py b/cli/lib/core/provision.py index a621456..c8f5258 100644 --- a/cli/lib/core/provision.py +++ b/cli/lib/core/provision.py @@ -27,136 +27,138 @@ import error class Error(error.Error): - """Base provision error class.""" + """Base provision error class.""" class MissingBuildError(Error): - """Raised when the required binaries have not yet been built.""" + """Raised when the required binaries have not yet been built.""" SYSTEM_IMAGE_FILENAME = 'system.img' class _ProvisionDir(object): - """Creates a directory to run `provision-device` from. + """Creates a directory to run `provision-device` from. - This replicates the platform build directory using symlinks, except - that system.img will be replaced with a link to a custom file if - one is available. + This replicates the platform build directory using symlinks, except + that system.img will be replaced with a link to a custom file if + one is available. - This is meant to be used as a context manager so that it's easy to - be sure the directory is cleaned up before exiting. - """ - - def __init__(self, product_build_out, system_image_dir=None): - """Creates a temp dir with the necessary files to provision. - - Args: - product_build_out: product build output directory. - system_image_dir: directory containing the custom system.img file - to use. If None or system.img does not exist, uses the one in - platform_build_out. + This is meant to be used as a context manager so that it's easy to + be sure the directory is cleaned up before exiting. """ - self.temp_dir = tempfile.mkdtemp(prefix='bdk-flash-') - - try: - # Look for a custom system.img we should use instead. - custom_image_file = None - if system_image_dir: - image_path = os.path.join(system_image_dir, SYSTEM_IMAGE_FILENAME) - if os.path.isfile(image_path): - custom_image_file = image_path - - self._fill_temp_dir(product_build_out, custom_image_file) - except: - # Wipe the directory if anything goes wrong. - self.cleanup() - raise - - def _fill_temp_dir(self, product_build_out, custom_image_file): - """Fills the temporary directory with the required symlinks.""" - # Not everything in product_build_out is necessary, but it's easier to - # just link it all and provision-device will pick up whatever it needs. - for filename in os.listdir(product_build_out): - link_dst = os.path.join(self.temp_dir, filename) - - # Replace the default system.img if we have a custom one. - if custom_image_file and filename == SYSTEM_IMAGE_FILENAME: - link_src = custom_image_file - else: - link_src = os.path.join(product_build_out, filename) - - os.symlink(link_src, link_dst) - def __enter__(self): - return self.temp_dir - - def __exit__(self, exception_type, value, traceback): - self.cleanup() - - def cleanup(self): - shutil.rmtree(self.temp_dir, ignore_errors=True) + def __init__(self, product_build_out, system_image_dir=None): + """Creates a temp dir with the necessary files to provision. + + Args: + product_build_out: product build output directory. + system_image_dir: directory containing the custom system.img file + to use. If None or system.img does not exist, uses the one in + platform_build_out. + """ + self.temp_dir = tempfile.mkdtemp(prefix='bdk-flash-') + + try: + # Look for a custom system.img we should use instead. + custom_image_file = None + if system_image_dir: + image_path = os.path.join(system_image_dir, + SYSTEM_IMAGE_FILENAME) + if os.path.isfile(image_path): + custom_image_file = image_path + + self._fill_temp_dir(product_build_out, custom_image_file) + except: + # Wipe the directory if anything goes wrong. + self.cleanup() + raise + + def _fill_temp_dir(self, product_build_out, custom_image_file): + """Fills the temporary directory with the required symlinks.""" + # Not everything in product_build_out is necessary, but it's easier to + # just link it all and provision-device will pick up whatever it needs. + for filename in os.listdir(product_build_out): + link_dst = os.path.join(self.temp_dir, filename) + + # Replace the default system.img if we have a custom one. + if custom_image_file and filename == SYSTEM_IMAGE_FILENAME: + link_src = custom_image_file + else: + link_src = os.path.join(product_build_out, filename) + + os.symlink(link_src, link_dst) + + def __enter__(self): + return self.temp_dir + + def __exit__(self, exception_type, value, traceback): + self.cleanup() + + def cleanup(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) def _get_provision_tool(os_source_dir, platform_build_out, bsp): - """Creates a ProvisionDeviceTool with proper settings. + """Creates a ProvisionDeviceTool with proper settings. - Args: - os_source_dir: the Brillo source root, needed by provision to - locate prebuilt vendor binaries. - platform_build_out: platform build output directory. - bsp: BSP name. + Args: + os_source_dir: the Brillo source root, needed by provision to + locate prebuilt vendor binaries. + platform_build_out: platform build output directory. + bsp: BSP name. - Returns: - The created ProvisionDeviceTool. + Returns: + The created ProvisionDeviceTool. - Raises: - MissingBuildError: the platform has not been built yet. - """ - # The provision-device script needs fastboot to be on PATH, so we create - # a fastboot tool to add it's parent directory to the provision tool PATH. - fastboot_tool = tool.HostToolWrapper('fastboot', platform_build_out) - provision_tool = tool.ProvisionDeviceTool( - os_source_dir, platform_build_out, bsp, - env={'PATH': os.path.dirname(fastboot_tool.path())}) + Raises: + MissingBuildError: the platform has not been built yet. + """ + # The provision-device script needs fastboot to be on PATH, so we create + # a fastboot tool to add it's parent directory to the provision tool PATH. + fastboot_tool = tool.HostToolWrapper('fastboot', platform_build_out) + provision_tool = tool.ProvisionDeviceTool( + os_source_dir, platform_build_out, bsp, + env={'PATH': os.path.dirname(fastboot_tool.path())}) - for t in (fastboot_tool, provision_tool): - if not t.exists(): - raise MissingBuildError( - '{} does not exist; use `bdk build platform` first'.format(t.path())) + for t in (fastboot_tool, provision_tool): + if not t.exists(): + raise MissingBuildError( + '{} does not exist; use `bdk build platform` first'.format( + t.path())) - return provision_tool + return provision_tool def provision_device(target, platform_build_out, system_image_dir=None, provision_args=None): - """Provisions the attached device using the `provision-device` script. - - Requires that the platform has been built so that the image files, - fastboot, and provision-device all exist in platform_build_out. - - Args: - target: the project Target object. - platform_build_out: the root of the platform build output. - system_image_dir: directory containing the custom system.img to - flash. If None or the system.img file does not exist, uses - the default system.img from platform_build_out instead. - provision_args: list of string args to pass to provision-device. - - Returns: - The `provision-device` exit code. - - Raises: - MissingBuildError: the platform has not been built yet. - core.util.OSVersionError: the target requests an invalid os version. - """ - os_source_dir = util.GetOSPath(target.os_version) - provision_tool = _get_provision_tool(os_source_dir, platform_build_out, - target.board) - - # Combine the platform ANDROID_PRODUCT_OUT directory with our system.img. - with _ProvisionDir(provision_tool.environment['ANDROID_PRODUCT_OUT'], - system_image_dir) as provision_dir: - # Point provision-device to the new directory. - provision_tool.environment['ANDROID_PRODUCT_OUT'] = provision_dir - return provision_tool.run(provision_args) + """Provisions the attached device using the `provision-device` script. + + Requires that the platform has been built so that the image files, + fastboot, and provision-device all exist in platform_build_out. + + Args: + target: the project Target object. + platform_build_out: the root of the platform build output. + system_image_dir: directory containing the custom system.img to + flash. If None or the system.img file does not exist, uses + the default system.img from platform_build_out instead. + provision_args: list of string args to pass to provision-device. + + Returns: + The `provision-device` exit code. + + Raises: + MissingBuildError: the platform has not been built yet. + core.util.OSVersionError: the target requests an invalid os version. + """ + os_source_dir = util.GetOSPath(target.os_version) + provision_tool = _get_provision_tool(os_source_dir, platform_build_out, + target.board) + + # Combine the platform ANDROID_PRODUCT_OUT directory with our system.img. + with _ProvisionDir(provision_tool.environment['ANDROID_PRODUCT_OUT'], + system_image_dir) as provision_dir: + # Point provision-device to the new directory. + provision_tool.environment['ANDROID_PRODUCT_OUT'] = provision_dir + return provision_tool.run(provision_args) diff --git a/cli/lib/core/provision_unittest.py b/cli/lib/core/provision_unittest.py index fd91115..d5dc10b 100644 --- a/cli/lib/core/provision_unittest.py +++ b/cli/lib/core/provision_unittest.py @@ -29,181 +29,189 @@ from test import stubs class ProvisionDeviceTest(unittest.TestCase): - """Tests for provision_device().""" - - _BSP = 'boardname' - _CUSTOM_SYSTEM_IMAGE_DIR = '/foo' - _CUSTOM_SYSTEM_IMAGE_PATH = os.path.join(_CUSTOM_SYSTEM_IMAGE_DIR, - 'system.img') - _HOST_ARCH = 'foo_arch' - _PLATFORM_BUILD_OUT = '/platform' - - # Necessary paths derived from our constants. - _PRODUCT_BUILD_OUT = util.GetAndroidProductOut(_PLATFORM_BUILD_OUT, _BSP) - _PROVISION_DEVICE_PATH = os.path.join(_PRODUCT_BUILD_OUT, 'provision-device') - _DEFAULT_SYSTEM_IMAGE_PATH = os.path.join(_PRODUCT_BUILD_OUT, 'system.img') - - _HOST_BUILD_OUT = os.path.join(_PLATFORM_BUILD_OUT, 'host', _HOST_ARCH) - _FASTBOOT_PATH = os.path.join(_HOST_BUILD_OUT, 'bin', 'fastboot') - - def setUp(self): - """Overrides imported modules with stubs.""" - self.stub_os = stubs.StubOs() - self.stub_shutil = stubs.StubShutil(self.stub_os) - self.stub_tempfile = stubs.StubTempfile(self.stub_os) - self.stub_subprocess = stubs.StubSubprocess() - self.stub_util = util_stub.StubUtil(arch=self._HOST_ARCH) - - provision.os = self.stub_os - provision.shutil = self.stub_shutil - provision.tempfile = self.stub_tempfile - provision.util = self.stub_util - - provision.tool.os = self.stub_os - provision.tool.subprocess = self.stub_subprocess - provision.tool.util = self.stub_util - - self.target = target_stub.StubTarget( - board=self._BSP, os_version=self.stub_util.GetOSVersion()) - - # Give more obvious names to some commonly used variables. - self.provision_temp_dir = self.stub_tempfile.temp_dir - self.system_image_link = self.stub_os.path.join(self.provision_temp_dir, - 'system.img') - - def tearDown(self): - """Performs shared end-of-test logic.""" - # No matter the result, the temp dir should not exist afterwards. - self.assertFalse(self.stub_os.path.exists(self.provision_temp_dir)) - self.assertFalse(self.stub_os.path.isdir(self.provision_temp_dir)) - - def prepare_os_files(self): - """Creates the necessary files for provision_device().""" - self.stub_os.path.should_be_dir.append(self._PRODUCT_BUILD_OUT) - self.stub_os.path.should_exist += [self._FASTBOOT_PATH, - self._PROVISION_DEVICE_PATH, - self._CUSTOM_SYSTEM_IMAGE_PATH] - self.stub_os.should_create_link.append(( - self._PROVISION_DEVICE_PATH, - os.path.join(self.provision_temp_dir, 'provision-device'))) - self.stub_os.should_makedirs.append(self.provision_temp_dir) - - def test_call(self): - """Tests a successful provision-device call.""" - self.prepare_os_files() - command = self.stub_subprocess.AddCommand() - - provision.provision_device(self.target, self._PLATFORM_BUILD_OUT) - # provision-device environment must have: - # * PATH contain the fastboot executable. - # * ANDROID_BUILD_TOP point to the OS source root. - # * ANDROID_PRODUCT_OUT point to our temporary product dir. - command.AssertCallWas( - [self._PROVISION_DEVICE_PATH], shell=False, cwd=None, - stdout=None, stderr=None, - env={'PATH': self.stub_os.path.dirname(self._FASTBOOT_PATH), - 'ANDROID_BUILD_TOP': self.stub_util.GetOSPath( - self.target.os_version), - 'ANDROID_HOST_OUT': self._HOST_BUILD_OUT, - 'ANDROID_PRODUCT_OUT': self.provision_temp_dir}) - - def test_caller_env_path(self): - """Tests that provision-device can access the caller's PATH.""" - self.prepare_os_files() - self.stub_os.environ['PATH'] = '/foo/bar:/baz' - command = self.stub_subprocess.AddCommand() - - provision.provision_device(self.target, self._PLATFORM_BUILD_OUT) - self.assertEqual( - self.stub_os.path.dirname(self._FASTBOOT_PATH) + ':/foo/bar:/baz', - command.GetCallArgs()[1]['env']['PATH']) - - def test_product_build_symlinks(self): - """Tests that the temp dir symlinks are generated.""" - self.prepare_os_files() - # Put a few extra files in the product build directory as symlink targets. - file_names = ('foo.txt', 'system.img', 'cache', 'bar000') - for name in file_names: - path = self.stub_os.path.join(self._PRODUCT_BUILD_OUT, name) - self.stub_os.path.should_exist.append(path) - self.stub_os.should_create_link.append(( - path, os.path.join(self.provision_temp_dir, name))) - - # Helper function to check that symlinks exist. - def check_symlinks(): - for name in file_names: - link_target = self.stub_os.path.join(self._PRODUCT_BUILD_OUT, name) - link_path = self.stub_os.path.join(self.provision_temp_dir, name) - self.assertEqual(link_target, self.stub_os.readlink(link_path)) - - # Run the symlink check as a side effect of the subprocess because by the - # time we get control back here the temp dir will have been cleaned up. - self.stub_subprocess.AddCommand(side_effect=check_symlinks) - - provision.provision_device(self.target, self._PLATFORM_BUILD_OUT) - - def test_custom_system_image(self): - """Tests that a custom system.img is used if provided.""" - self.prepare_os_files() - self.stub_os.path.should_exist.append(self._DEFAULT_SYSTEM_IMAGE_PATH) - self.stub_os.should_create_link.append(( - self._CUSTOM_SYSTEM_IMAGE_PATH, - os.path.join(self.provision_temp_dir, 'system.img'))) - - # Check the link points to the custom image, not the default system image. - def check_custom_image_link(): - self.assertEqual(self._CUSTOM_SYSTEM_IMAGE_PATH, - self.stub_os.readlink(self.system_image_link)) - self.stub_subprocess.AddCommand(side_effect=check_custom_image_link) - - provision.provision_device(self.target, self._PLATFORM_BUILD_OUT, - system_image_dir=self._CUSTOM_SYSTEM_IMAGE_DIR) - - def test_custom_system_image_missing(self): - """Tests specifying a nonexistent custom system.img.""" - self.prepare_os_files() - self.stub_os.path.should_exist.remove(self._CUSTOM_SYSTEM_IMAGE_PATH) - self.stub_os.path.should_exist.append(self._DEFAULT_SYSTEM_IMAGE_PATH) - self.stub_os.should_create_link.append(( - self._DEFAULT_SYSTEM_IMAGE_PATH, - os.path.join(self.provision_temp_dir, 'system.img'))) - - # In this case, even though the manifest may give a potential output - # directory, the user has not built a custom system.img yet, so we should - # be linking to the default system.img. - def check_default_image_link(): - self.assertEqual(self._DEFAULT_SYSTEM_IMAGE_PATH, - self.stub_os.readlink(self.system_image_link)) - self.stub_subprocess.AddCommand(side_effect=check_default_image_link) - - provision.provision_device(self.target, self._PLATFORM_BUILD_OUT, - system_image_dir=self._CUSTOM_SYSTEM_IMAGE_DIR) - - def test_provision_exit_code(self): - """Tests non-zero exit codes are propagated.""" - self.prepare_os_files() - # Test an errno other than the default. - errno = error.GENERIC_ERRNO + 1 - self.stub_subprocess.AddCommand(ret_code=errno) - - with self.assertRaises(error.Error) as e: - provision.provision_device(self.target, self._PLATFORM_BUILD_OUT) - self.assertEqual(errno, e.exception.errno) - - def test_fastboot_missing(self): - """Tests failure when fastboot is missing.""" - self.prepare_os_files() - self.stub_os.path.should_exist.remove(self._FASTBOOT_PATH) - self.stub_subprocess.AddCommand() - - with self.assertRaises(provision.MissingBuildError): - provision.provision_device(self.target, self._PLATFORM_BUILD_OUT) - - def test_provision_device_missing(self): - """Tests failure when provision-device is missing.""" - self.prepare_os_files() - self.stub_os.path.should_exist.remove(self._PROVISION_DEVICE_PATH) - self.stub_subprocess.AddCommand() - - with self.assertRaises(provision.MissingBuildError): - provision.provision_device(self.target, self._PLATFORM_BUILD_OUT) + """Tests for provision_device().""" + + _BSP = 'boardname' + _CUSTOM_SYSTEM_IMAGE_DIR = '/foo' + _CUSTOM_SYSTEM_IMAGE_PATH = os.path.join(_CUSTOM_SYSTEM_IMAGE_DIR, + 'system.img') + _HOST_ARCH = 'foo_arch' + _PLATFORM_BUILD_OUT = '/platform' + + # Necessary paths derived from our constants. + _PRODUCT_BUILD_OUT = util.GetAndroidProductOut(_PLATFORM_BUILD_OUT, _BSP) + _PROVISION_DEVICE_PATH = os.path.join(_PRODUCT_BUILD_OUT, + 'provision-device') + _DEFAULT_SYSTEM_IMAGE_PATH = os.path.join(_PRODUCT_BUILD_OUT, 'system.img') + + _HOST_BUILD_OUT = os.path.join(_PLATFORM_BUILD_OUT, 'host', _HOST_ARCH) + _FASTBOOT_PATH = os.path.join(_HOST_BUILD_OUT, 'bin', 'fastboot') + + def setUp(self): + """Overrides imported modules with stubs.""" + self.stub_os = stubs.StubOs() + self.stub_shutil = stubs.StubShutil(self.stub_os) + self.stub_tempfile = stubs.StubTempfile(self.stub_os) + self.stub_subprocess = stubs.StubSubprocess() + self.stub_util = util_stub.StubUtil(arch=self._HOST_ARCH) + + provision.os = self.stub_os + provision.shutil = self.stub_shutil + provision.tempfile = self.stub_tempfile + provision.util = self.stub_util + + provision.tool.os = self.stub_os + provision.tool.subprocess = self.stub_subprocess + provision.tool.util = self.stub_util + + self.target = target_stub.StubTarget( + board=self._BSP, os_version=self.stub_util.GetOSVersion()) + + # Give more obvious names to some commonly used variables. + self.provision_temp_dir = self.stub_tempfile.temp_dir + self.system_image_link = self.stub_os.path.join(self.provision_temp_dir, + 'system.img') + + def tearDown(self): + """Performs shared end-of-test logic.""" + # No matter the result, the temp dir should not exist afterwards. + self.assertFalse(self.stub_os.path.exists(self.provision_temp_dir)) + self.assertFalse(self.stub_os.path.isdir(self.provision_temp_dir)) + + def prepare_os_files(self): + """Creates the necessary files for provision_device().""" + self.stub_os.path.should_be_dir.append(self._PRODUCT_BUILD_OUT) + self.stub_os.path.should_exist += [self._FASTBOOT_PATH, + self._PROVISION_DEVICE_PATH, + self._CUSTOM_SYSTEM_IMAGE_PATH] + self.stub_os.should_create_link.append(( + self._PROVISION_DEVICE_PATH, + os.path.join(self.provision_temp_dir, 'provision-device'))) + self.stub_os.should_makedirs.append(self.provision_temp_dir) + + def test_call(self): + """Tests a successful provision-device call.""" + self.prepare_os_files() + command = self.stub_subprocess.AddCommand() + + provision.provision_device(self.target, self._PLATFORM_BUILD_OUT) + # provision-device environment must have: + # * PATH contain the fastboot executable. + # * ANDROID_BUILD_TOP point to the OS source root. + # * ANDROID_PRODUCT_OUT point to our temporary product dir. + command.AssertCallWas( + [self._PROVISION_DEVICE_PATH], shell=False, cwd=None, + stdout=None, stderr=None, + env={'PATH': self.stub_os.path.dirname(self._FASTBOOT_PATH), + 'ANDROID_BUILD_TOP': self.stub_util.GetOSPath( + self.target.os_version), + 'ANDROID_HOST_OUT': self._HOST_BUILD_OUT, + 'ANDROID_PRODUCT_OUT': self.provision_temp_dir}) + + def test_caller_env_path(self): + """Tests that provision-device can access the caller's PATH.""" + self.prepare_os_files() + self.stub_os.environ['PATH'] = '/foo/bar:/baz' + command = self.stub_subprocess.AddCommand() + + provision.provision_device(self.target, self._PLATFORM_BUILD_OUT) + self.assertEqual( + self.stub_os.path.dirname(self._FASTBOOT_PATH) + ':/foo/bar:/baz', + command.GetCallArgs()[1]['env']['PATH']) + + def test_product_build_symlinks(self): + """Tests that the temp dir symlinks are generated.""" + self.prepare_os_files() + # Put a few extra files in the product build directory as symlink + # targets. + file_names = ('foo.txt', 'system.img', 'cache', 'bar000') + for name in file_names: + path = self.stub_os.path.join(self._PRODUCT_BUILD_OUT, name) + self.stub_os.path.should_exist.append(path) + self.stub_os.should_create_link.append(( + path, os.path.join(self.provision_temp_dir, name))) + + # Helper function to check that symlinks exist. + def check_symlinks(): + for name in file_names: + link_target = self.stub_os.path.join(self._PRODUCT_BUILD_OUT, + name) + link_path = self.stub_os.path.join(self.provision_temp_dir, + name) + self.assertEqual(link_target, self.stub_os.readlink(link_path)) + + # Run the symlink check as a side effect of the subprocess because by + # the time we get control back here the temp dir will have been cleaned + # up. + self.stub_subprocess.AddCommand(side_effect=check_symlinks) + + provision.provision_device(self.target, self._PLATFORM_BUILD_OUT) + + def test_custom_system_image(self): + """Tests that a custom system.img is used if provided.""" + self.prepare_os_files() + self.stub_os.path.should_exist.append(self._DEFAULT_SYSTEM_IMAGE_PATH) + self.stub_os.should_create_link.append(( + self._CUSTOM_SYSTEM_IMAGE_PATH, + os.path.join(self.provision_temp_dir, 'system.img'))) + + # Check the link points to the custom image, not the default system + # image. + def check_custom_image_link(): + self.assertEqual(self._CUSTOM_SYSTEM_IMAGE_PATH, + self.stub_os.readlink(self.system_image_link)) + self.stub_subprocess.AddCommand(side_effect=check_custom_image_link) + + provision.provision_device( + self.target, self._PLATFORM_BUILD_OUT, + system_image_dir=self._CUSTOM_SYSTEM_IMAGE_DIR) + + def test_custom_system_image_missing(self): + """Tests specifying a nonexistent custom system.img.""" + self.prepare_os_files() + self.stub_os.path.should_exist.remove(self._CUSTOM_SYSTEM_IMAGE_PATH) + self.stub_os.path.should_exist.append(self._DEFAULT_SYSTEM_IMAGE_PATH) + self.stub_os.should_create_link.append(( + self._DEFAULT_SYSTEM_IMAGE_PATH, + os.path.join(self.provision_temp_dir, 'system.img'))) + + # In this case, even though the manifest may give a potential output + # directory, the user has not built a custom system.img yet, so we + # should be linking to the default system.img. + def check_default_image_link(): + self.assertEqual(self._DEFAULT_SYSTEM_IMAGE_PATH, + self.stub_os.readlink(self.system_image_link)) + self.stub_subprocess.AddCommand(side_effect=check_default_image_link) + + provision.provision_device( + self.target, self._PLATFORM_BUILD_OUT, + system_image_dir=self._CUSTOM_SYSTEM_IMAGE_DIR) + + def test_provision_exit_code(self): + """Tests non-zero exit codes are propagated.""" + self.prepare_os_files() + # Test an errno other than the default. + errno = error.GENERIC_ERRNO + 1 + self.stub_subprocess.AddCommand(ret_code=errno) + + with self.assertRaises(error.Error) as e: + provision.provision_device(self.target, self._PLATFORM_BUILD_OUT) + self.assertEqual(errno, e.exception.errno) + + def test_fastboot_missing(self): + """Tests failure when fastboot is missing.""" + self.prepare_os_files() + self.stub_os.path.should_exist.remove(self._FASTBOOT_PATH) + self.stub_subprocess.AddCommand() + + with self.assertRaises(provision.MissingBuildError): + provision.provision_device(self.target, self._PLATFORM_BUILD_OUT) + + def test_provision_device_missing(self): + """Tests failure when provision-device is missing.""" + self.prepare_os_files() + self.stub_os.path.should_exist.remove(self._PROVISION_DEVICE_PATH) + self.stub_subprocess.AddCommand() + + with self.assertRaises(provision.MissingBuildError): + provision.provision_device(self.target, self._PLATFORM_BUILD_OUT) diff --git a/cli/lib/core/timer.py b/cli/lib/core/timer.py index 3a5ca9a..3081c92 100644 --- a/cli/lib/core/timer.py +++ b/cli/lib/core/timer.py @@ -21,69 +21,71 @@ import time class Timer(object): - """A simple object to time various things. - - Attributes: - name - The name of what is being timed. - label - (optional) An additional text label for the timing. Default None. - """ - - _all_timers = [] - _group_paused_timers = [] - - @classmethod - def MassPauseRunningTimers(cls): - """Pauses all running timers.""" - timers_to_pause = [t for t in cls._all_timers if t.IsRunning()] - for t in timers_to_pause: - t.Stop() - cls._group_paused_timers += timers_to_pause - - @classmethod - def ResumeMassPausedTimers(cls): - """Resumes all timers that were paused using cls.MassPauseRunningTimers()""" - for t in cls._group_paused_timers: - t.Start() - cls._group_paused_timers = [] - - def __init__(self, name, label=None): - self._all_timers.append(self) - self.name = name - self.label = label - self._start_time = None - self.Reset() - - def Reset(self): - """Resets the timer, stopping it and setting it to 0.""" - self._time = 0 - self._start_time = None - - def Start(self): - """Starts the timer continuing from whatever time is showing. - - Does nothing to a timer that is already running. - """ - if self._start_time is None: - self._start_time = time.time() - - def Stop(self): - """Stops the timer, leaving its current amount of time showing.""" - if self._start_time is not None: - self._time += time.time() - self._start_time - self._start_time = None - - def IsRunning(self): - return self._start_time is not None - - def Read(self): - """Read the current time of the timer in seconds with float precision. + """A simple object to time various things. - Works while running, but best practice would be to Stop first. + Attributes: + name: The name of what is being timed. + label: (optional) An additional text label for the timing. Default None. """ - # Amount of time previously timed. - result = self._time - # If currently running, look at the time elapsed since last stop. - if self._start_time is not None: - result += time.time() - self._start_time - return result + _all_timers = [] + _group_paused_timers = [] + + @classmethod + def MassPauseRunningTimers(cls): + """Pauses all running timers.""" + timers_to_pause = [t for t in cls._all_timers if t.IsRunning()] + for t in timers_to_pause: + t.Stop() + cls._group_paused_timers += timers_to_pause + + @classmethod + def ResumeMassPausedTimers(cls): + """Resumes all timers that were paused using + cls.MassPauseRunningTimers() + """ + for t in cls._group_paused_timers: + t.Start() + cls._group_paused_timers = [] + + def __init__(self, name, label=None): + self._all_timers.append(self) + self.name = name + self.label = label + self._start_time = None + self.Reset() + + def Reset(self): + """Resets the timer, stopping it and setting it to 0.""" + self._time = 0 + self._start_time = None + + def Start(self): + """Starts the timer continuing from whatever time is showing. + + Does nothing to a timer that is already running. + """ + if self._start_time is None: + self._start_time = time.time() + + def Stop(self): + """Stops the timer, leaving its current amount of time showing.""" + if self._start_time is not None: + self._time += time.time() - self._start_time + self._start_time = None + + def IsRunning(self): + return self._start_time is not None + + def Read(self): + """Read the current time of the timer in seconds with float precision. + + Works while running, but best practice would be to Stop first. + """ + # Amount of time previously timed. + result = self._time + # If currently running, look at the time elapsed since last stop. + if self._start_time is not None: + result += time.time() - self._start_time + + return result diff --git a/cli/lib/core/timer_unittest.py b/cli/lib/core/timer_unittest.py index b8375f7..dd32a65 100644 --- a/cli/lib/core/timer_unittest.py +++ b/cli/lib/core/timer_unittest.py @@ -25,113 +25,113 @@ from test import stubs class TimerTest(unittest.TestCase): - def setUp(self): - self.stub_time = stubs.StubTime() - - timer.time = self.stub_time - - def RunningTimerWithTime(self, time): - """Returns a timer running with a given amount of time already on it. - - The timer will have been started at stub_time = 0. - """ - self.stub_time.current_time = 0 - t = timer.Timer('event') - t.Start() - self.stub_time.current_time = time - # Confirm that this is indeed a running timer with the desired time. - self.assertTrue(t.IsRunning()) - self.assertAlmostEqual(t.Read(), time) - return t - - def StoppedTimerWithTime(self, time): - """Returns a timer stopped with a given amount of time already on it.""" - t = self.RunningTimerWithTime(time) - t.Stop() - # Confirm that this is indeed a stopped timer with the desired time. - self.assertFalse(t.IsRunning()) - self.assertAlmostEqual(t.Read(), time) - return t - - def test_init(self): - # Time at init shouldn't matter. - self.stub_time.current_time = 123.45 - t = timer.Timer('event') - self.assertFalse(t.IsRunning()) - self.assertAlmostEqual(t.Read(), 0) - - def test_stopped_read(self): - t = self.StoppedTimerWithTime(12.3) - self.assertAlmostEqual(t.Read(), 12.3) - # Reading shouldn't change running state. - self.assertFalse(t.IsRunning()) - - # Even if time is progressed, it should still read the same value. - self.stub_time.current_time = 45.6 - self.assertAlmostEqual(t.Read(), 12.3) - - def test_stopped_start(self): - t = self.StoppedTimerWithTime(12.3) - t.Start() - self.assertTrue(t.IsRunning()) - # Should not reset time. - self.assertAlmostEqual(t.Read(), 12.3) - - def test_stopped_reset(self): - t = self.StoppedTimerWithTime(12.3) - t.Reset() - # Reset should stop and set to 0. - self.assertFalse(t.IsRunning()) - self.assertAlmostEqual(t.Read(), 0) - - def test_stopped_stop(self): - t = self.StoppedTimerWithTime(12.3) - t.Stop() - # Stop should have no further effect. - self.assertFalse(t.IsRunning()) - self.assertAlmostEqual(t.Read(), 12.3) - - def test_running_read(self): - t = self.RunningTimerWithTime(12.3) - self.assertAlmostEqual(t.Read(), 12.3) - # Read should not change running state. - self.assertTrue(t.IsRunning()) - - # If time is progressed, it should read the appropriate value. - self.stub_time.current_time = 45.6 - self.assertAlmostEqual(t.Read(), 45.6) - - def test_running_start(self): - t = self.RunningTimerWithTime(12.3) - t.Start() - # Should have no effect. - self.assertTrue(t.IsRunning()) - self.assertAlmostEqual(t.Read(), 12.3) - - def test_running_reset(self): - t = self.RunningTimerWithTime(12.3) - t.Reset() - # Reset should stop and set to 0. - self.assertFalse(t.IsRunning()) - self.assertAlmostEqual(t.Read(), 0) - - def test_running_stop(self): - t = self.RunningTimerWithTime(12.3) - t.Stop() - # Stop should stop but maintain time. - self.assertFalse(t.IsRunning()) - self.assertAlmostEqual(t.Read(), 12.3) - - def test_combined_read(self): - t = self.StoppedTimerWithTime(12.3) - self.stub_time.current_time = 20 - t.Start() - self.stub_time.current_time = 45.6 - expected_read_time = 12.3 + (45.6 - 20) - # When a timer has been previously stopped, current running time should be - # additive. - self.assertAlmostEqual(t.Read(), expected_read_time) - # Stopping again shouldn't overwrite this number. - t.Stop() - self.stub_time.current_time = 56.7 - self.assertAlmostEqual(t.Read(), expected_read_time) + def setUp(self): + self.stub_time = stubs.StubTime() + + timer.time = self.stub_time + + def RunningTimerWithTime(self, time): + """Returns a timer running with a given amount of time already on it. + + The timer will have been started at stub_time = 0. + """ + self.stub_time.current_time = 0 + t = timer.Timer('event') + t.Start() + self.stub_time.current_time = time + # Confirm that this is indeed a running timer with the desired time. + self.assertTrue(t.IsRunning()) + self.assertAlmostEqual(t.Read(), time) + return t + + def StoppedTimerWithTime(self, time): + """Returns a timer stopped with a given amount of time already on it.""" + t = self.RunningTimerWithTime(time) + t.Stop() + # Confirm that this is indeed a stopped timer with the desired time. + self.assertFalse(t.IsRunning()) + self.assertAlmostEqual(t.Read(), time) + return t + + def test_init(self): + # Time at init shouldn't matter. + self.stub_time.current_time = 123.45 + t = timer.Timer('event') + self.assertFalse(t.IsRunning()) + self.assertAlmostEqual(t.Read(), 0) + + def test_stopped_read(self): + t = self.StoppedTimerWithTime(12.3) + self.assertAlmostEqual(t.Read(), 12.3) + # Reading shouldn't change running state. + self.assertFalse(t.IsRunning()) + + # Even if time is progressed, it should still read the same value. + self.stub_time.current_time = 45.6 + self.assertAlmostEqual(t.Read(), 12.3) + + def test_stopped_start(self): + t = self.StoppedTimerWithTime(12.3) + t.Start() + self.assertTrue(t.IsRunning()) + # Should not reset time. + self.assertAlmostEqual(t.Read(), 12.3) + + def test_stopped_reset(self): + t = self.StoppedTimerWithTime(12.3) + t.Reset() + # Reset should stop and set to 0. + self.assertFalse(t.IsRunning()) + self.assertAlmostEqual(t.Read(), 0) + + def test_stopped_stop(self): + t = self.StoppedTimerWithTime(12.3) + t.Stop() + # Stop should have no further effect. + self.assertFalse(t.IsRunning()) + self.assertAlmostEqual(t.Read(), 12.3) + + def test_running_read(self): + t = self.RunningTimerWithTime(12.3) + self.assertAlmostEqual(t.Read(), 12.3) + # Read should not change running state. + self.assertTrue(t.IsRunning()) + + # If time is progressed, it should read the appropriate value. + self.stub_time.current_time = 45.6 + self.assertAlmostEqual(t.Read(), 45.6) + + def test_running_start(self): + t = self.RunningTimerWithTime(12.3) + t.Start() + # Should have no effect. + self.assertTrue(t.IsRunning()) + self.assertAlmostEqual(t.Read(), 12.3) + + def test_running_reset(self): + t = self.RunningTimerWithTime(12.3) + t.Reset() + # Reset should stop and set to 0. + self.assertFalse(t.IsRunning()) + self.assertAlmostEqual(t.Read(), 0) + + def test_running_stop(self): + t = self.RunningTimerWithTime(12.3) + t.Stop() + # Stop should stop but maintain time. + self.assertFalse(t.IsRunning()) + self.assertAlmostEqual(t.Read(), 12.3) + + def test_combined_read(self): + t = self.StoppedTimerWithTime(12.3) + self.stub_time.current_time = 20 + t.Start() + self.stub_time.current_time = 45.6 + expected_read_time = 12.3 + (45.6 - 20) + # When a timer has been previously stopped, current running time should + # be additive. + self.assertAlmostEqual(t.Read(), expected_read_time) + # Stopping again shouldn't overwrite this number. + t.Stop() + self.stub_time.current_time = 56.7 + self.assertAlmostEqual(t.Read(), expected_read_time) diff --git a/cli/lib/core/tool.py b/cli/lib/core/tool.py index 699dbef..96f1da7 100644 --- a/cli/lib/core/tool.py +++ b/cli/lib/core/tool.py @@ -26,17 +26,17 @@ import error class Error(error.Error): - """Base class for all tool errors.""" + """Base class for all tool errors.""" class ExecuteError(Error): - """Raised when the tool fails to execute.""" - description = 'Failed to execute' + """Raised when the tool fails to execute.""" + description = 'Failed to execute' class ReturnError(Error): - """Raised when the tool returns an error code.""" - description = 'Error code returned from tool' + """Raised when the tool returns an error code.""" + description = 'Error code returned from tool' # Variables to pass through to all command calls. These are mostly needed @@ -48,295 +48,300 @@ DEFAULT_PASSTHROUGH_ENV = [ class ToolWrapper(object): - """Wraps a host binary, target script, or build command. - - The advantages of this over using subprocess directly are: - * Properly sets the execution working directory with set_cwd(). - * Strips '--' argument if given. - * Restricts passthrough environment to a safe set of defaults. - * Handles signal return codes properly. - * Helper function to set common environment variables. - - Attributes: - environment: a dictionary of environment variables to pass. - """ - - def __init__(self, path, env=None): - """Initializes a ToolWrapper. - - Args: - path: path to the tool executable. - env: a dictionary of additional environmental variables to set. - Can also be set after creation via the environment attribute. - """ - self._tool_path = path - self._cwd = None - self.environment = {var: os.environ[var] for var in DEFAULT_PASSTHROUGH_ENV - if var in os.environ} - if env: - self.environment.update(env) - - def set_cwd(self, cwd): - self._cwd = cwd - - def set_android_environment(self, source_top=None, build_out=None, bsp=None): - """Sets Android environment variables. - - Android has a few common variables used by a variety of tools. This - function sets any environment variables that can be derived from - the given arguments. - - All arguments are optional, any variables that depend on missing - arguments will not be modified. Requirements are: - ANDROID_BUILD_TOP: source_top - ANDROID_HOST_OUT: build_out - ANDROID_PRODUCT_OUT: build_out, bsp - - Args: - source_top: root of the source tree. - build_out: root of the build output folder. - bsp: the BSP name. - """ - if source_top: - self.environment['ANDROID_BUILD_TOP'] = source_top - - if build_out: - self.environment['ANDROID_HOST_OUT'] = os.path.join( - build_out, 'host', util.GetHostArch()) - - if build_out and bsp: - self.environment['ANDROID_PRODUCT_OUT'] = util.GetAndroidProductOut( - build_out, bsp) - - def add_caller_env_path(self): - """Adds the caller's PATH environment variable. + """Wraps a host binary, target script, or build command. - Most tools do not want to inherit PATH to avoid any unexpected - interactions with the caller's environment. However, some tools - do need the PATH variable (e.g. shell scripts may need to be able - to find utilities like dirname). + The advantages of this over using subprocess directly are: + * Properly sets the execution working directory with set_cwd(). + * Strips '--' argument if given. + * Restricts passthrough environment to a safe set of defaults. + * Handles signal return codes properly. + * Helper function to set common environment variables. - If the tool already has a PATH, the caller's PATH is appended. + Attributes: + environment: a dictionary of environment variables to pass. """ - caller_path = os.environ.get('PATH') - if caller_path is not None: - if 'PATH' in self.environment: - self.environment['PATH'] += os.pathsep + caller_path - else: - self.environment['PATH'] = caller_path - - - def path(self): - return self._tool_path - - def exists(self): - return os.path.isfile(self._tool_path) - - def run(self, arg_array=None, piped=False): - """Executes the tool and blocks until completion. - Args: - arg_array: list of string arguments to pass to the tool. - piped: If true, send stdout and stderr to pipes. - - Raises: - ExecuteError: if execution fails. - ReturnError: if execution returns a non-0 exit code. - - Returns: - (out, err): The output to stdout and stderr of the called tool. - Will both be None if piped=False. - """ - # Remove -- passthrough indicator from arg_array. - if arg_array and arg_array[0] == '--': - arg_array = arg_array[1:] - - # Make sure PWD is accurate on CWD change. - if self._cwd: - self.environment['PWD'] = os.path.abspath(self._cwd) - - stdout = None - stderr = None - if piped: - stdout = subprocess.PIPE - stderr = subprocess.PIPE - - try: - tool_process = subprocess.Popen([self._tool_path] + (arg_array or []), - env=self.environment, shell=False, - cwd=self._cwd, stdout=stdout, - stderr=stderr) - (out, err) = tool_process.communicate() - except OSError as e: - # Catch and re-raise so we can include the tool path in the message. - raise ExecuteError('"{}": {} [{}]'.format( - self._tool_path, e.errno, e.strerror)) - - # Exiting via signal gives negative return codes. - ret = tool_process.returncode - if ret < 0: - # Return the normal shell exit mask for being signaled. - ret = 128 - ret - if ret != 0: - raise ReturnError('"{}": {} ({})'.format(self._tool_path, ret, err), - errno=ret) - - return (out, err) + def __init__(self, path, env=None): + """Initializes a ToolWrapper. + + Args: + path: path to the tool executable. + env: a dictionary of additional environmental variables to set. + Can also be set after creation via the environment attribute. + """ + self._tool_path = path + self._cwd = None + self.environment = {var: os.environ[var] + for var in DEFAULT_PASSTHROUGH_ENV + if var in os.environ} + if env: + self.environment.update(env) + + def set_cwd(self, cwd): + self._cwd = cwd + + def set_android_environment(self, source_top=None, build_out=None, + bsp=None): + """Sets Android environment variables. + + Android has a few common variables used by a variety of tools. This + function sets any environment variables that can be derived from + the given arguments. + + All arguments are optional, any variables that depend on missing + arguments will not be modified. Requirements are: + ANDROID_BUILD_TOP: source_top + ANDROID_HOST_OUT: build_out + ANDROID_PRODUCT_OUT: build_out, bsp + + Args: + source_top: root of the source tree. + build_out: root of the build output folder. + bsp: the BSP name. + """ + if source_top: + self.environment['ANDROID_BUILD_TOP'] = source_top + + if build_out: + self.environment['ANDROID_HOST_OUT'] = os.path.join( + build_out, 'host', util.GetHostArch()) + + if build_out and bsp: + self.environment['ANDROID_PRODUCT_OUT'] = util.GetAndroidProductOut( + build_out, bsp) + + def add_caller_env_path(self): + """Adds the caller's PATH environment variable. + + Most tools do not want to inherit PATH to avoid any unexpected + interactions with the caller's environment. However, some tools + do need the PATH variable (e.g. shell scripts may need to be able + to find utilities like dirname). + + If the tool already has a PATH, the caller's PATH is appended. + """ + caller_path = os.environ.get('PATH') + if caller_path is not None: + if 'PATH' in self.environment: + self.environment['PATH'] += os.pathsep + caller_path + else: + self.environment['PATH'] = caller_path + + + def path(self): + return self._tool_path + + def exists(self): + return os.path.isfile(self._tool_path) + + def run(self, arg_array=None, piped=False): + """Executes the tool and blocks until completion. + + Args: + arg_array: list of string arguments to pass to the tool. + piped: If true, send stdout and stderr to pipes. + + Raises: + ExecuteError: if execution fails. + ReturnError: if execution returns a non-0 exit code. + + Returns: + (out, err): The output to stdout and stderr of the called tool. + Will both be None if piped=False. + """ + # Remove -- passthrough indicator from arg_array. + if arg_array and arg_array[0] == '--': + arg_array = arg_array[1:] + + # Make sure PWD is accurate on CWD change. + if self._cwd: + self.environment['PWD'] = os.path.abspath(self._cwd) + + stdout = None + stderr = None + if piped: + stdout = subprocess.PIPE + stderr = subprocess.PIPE + + try: + tool_process = subprocess.Popen( + [self._tool_path] + (arg_array or []), + env=self.environment, shell=False, + cwd=self._cwd, stdout=stdout, + stderr=stderr) + (out, err) = tool_process.communicate() + except OSError as e: + # Catch and re-raise so we can include the tool path in the message. + raise ExecuteError('"{}": {} [{}]'.format( + self._tool_path, e.errno, e.strerror)) + + # Exiting via signal gives negative return codes. + ret = tool_process.returncode + if ret < 0: + # Return the normal shell exit mask for being signaled. + ret = 128 - ret + if ret != 0: + raise ReturnError('"{}": {} ({})'.format(self._tool_path, ret, err), + errno=ret) + + return (out, err) class HostToolWrapper(ToolWrapper): - """Wraps a tool from out/host/<arch>/bin/.""" + """Wraps a tool from out/host/<arch>/bin/.""" - def __init__(self, path, build_out, bsp=None, env=None): - """Initializes a HostToolWrapper. + def __init__(self, path, build_out, bsp=None, env=None): + """Initializes a HostToolWrapper. - Args: - path: tool path relative to <build_top>/out/host/<arch>/bin/. - build_out: root of the build output folder where the tool lives. - bsp: the BSP name. Optional, but should be set for any tool that - uses ANDROID_PRODUCT_OUT (e.g. fastboot and adb). - env: a dictionary of additional environmental variables to set. - """ - # Initialize path to '' at first so we can use ANDROID_HOST_OUT. - super(HostToolWrapper, self).__init__('', env=env) - self.set_android_environment(build_out=build_out, bsp=bsp) - self._tool_path = os.path.join(self.environment['ANDROID_HOST_OUT'], - 'bin', path) + Args: + path: tool path relative to <build_top>/out/host/<arch>/bin/. + build_out: root of the build output folder where the tool lives. + bsp: the BSP name. Optional, but should be set for any tool that + uses ANDROID_PRODUCT_OUT (e.g. fastboot and adb). + env: a dictionary of additional environmental variables to set. + """ + # Initialize path to '' at first so we can use ANDROID_HOST_OUT. + super(HostToolWrapper, self).__init__('', env=env) + self.set_android_environment(build_out=build_out, bsp=bsp) + self._tool_path = os.path.join(self.environment['ANDROID_HOST_OUT'], + 'bin', path) class HostToolRunner(object): - """Serves as a HostToolWrapper factory.""" + """Serves as a HostToolWrapper factory.""" - def __init__(self, build_out): - self._build_out = build_out + def __init__(self, build_out): + self._build_out = build_out - def run(self, path, args): - host_tool = HostToolWrapper(path, self._build_out) - return host_tool.run(args) + def run(self, path, args): + host_tool = HostToolWrapper(path, self._build_out) + return host_tool.run(args) class PathToolWrapper(ToolWrapper): - """Wraps a tool expected to be in the user PATH.""" + """Wraps a tool expected to be in the user PATH.""" - def __init__(self, program, env=None): - super(PathToolWrapper, self).__init__(program, env) - self.add_caller_env_path() + def __init__(self, program, env=None): + super(PathToolWrapper, self).__init__(program, env) + self.add_caller_env_path() class PathToolRunner(object): - """Serves as a PathToolWrapper factory.""" + """Serves as a PathToolWrapper factory.""" - def run(self, path, args): - path_tool = PathToolWrapper(path) - return path_tool.run(args) + def run(self, path, args): + path_tool = PathToolWrapper(path) + return path_tool.run(args) class ProvisionDeviceTool(ToolWrapper): - """Wraps the provision-device script. - - provision-device is unique since it's built as part of the product - output rather than the host output, and also requires a pointer to - the source tree which other tools don't, so it's useful to create a - specific subclass for it. - - Note: provision-device allows two special environment variables - ANDROID_PROVISION_VENDOR_PARTITIONS and ANDROID_PROVISION_OS_PARTITIONS - that replace ANDROID_BUILD_TOP and ANDROID_PRODUCT_OUT if present. - These are for advanced usage and could easily create confusion, so - are intentionally not passed through here; if a user requires these - they will have to call provision-device manually. - """ - - def __init__(self, source_top, build_out, bsp, env=None): - """Initializes a ProvisionDeviceTool. - - Args: - source_top: root of the source tree with the vendor BSP binaries. - build_out: root of the build output folder where the tool lives. - bsp: the BSP name. - env: a dictionary of additional environmental variables to set. + """Wraps the provision-device script. + + provision-device is unique since it's built as part of the product + output rather than the host output, and also requires a pointer to + the source tree which other tools don't, so it's useful to create a + specific subclass for it. + + Note: provision-device allows two special environment variables + ANDROID_PROVISION_VENDOR_PARTITIONS and ANDROID_PROVISION_OS_PARTITIONS + that replace ANDROID_BUILD_TOP and ANDROID_PRODUCT_OUT if present. + These are for advanced usage and could easily create confusion, so + are intentionally not passed through here; if a user requires these + they will have to call provision-device manually. """ - # Initialize path to '' at first so we can use ANDROID_PRODUCT_OUT. - super(ProvisionDeviceTool, self).__init__('', env=env) - self.set_android_environment(source_top=source_top, build_out=build_out, - bsp=bsp) - self._tool_path = os.path.join(self.environment['ANDROID_PRODUCT_OUT'], - 'provision-device') - # provision-device is a shell script, so it needs to know PATH in order - # to find utilities. - self.add_caller_env_path() + def __init__(self, source_top, build_out, bsp, env=None): + """Initializes a ProvisionDeviceTool. + + Args: + source_top: root of the source tree with the vendor BSP binaries. + build_out: root of the build output folder where the tool lives. + bsp: the BSP name. + env: a dictionary of additional environmental variables to set. + """ + # Initialize path to '' at first so we can use ANDROID_PRODUCT_OUT. + super(ProvisionDeviceTool, self).__init__('', env=env) + self.set_android_environment(source_top=source_top, build_out=build_out, + bsp=bsp) + self._tool_path = os.path.join(self.environment['ANDROID_PRODUCT_OUT'], + 'provision-device') + + # provision-device is a shell script, so it needs to know PATH in order + # to find utilities. + self.add_caller_env_path() class BrunchToolWrapper(ToolWrapper): - """Legacy tool wrapper used by Brunch. - - This adds some additional functionality to the base ToolWrapper: - * Adds additional Brunch-specific environment variables. - * Sets PATH, ANDROID_PRODUCT_OUT, and ANDROID_BUILD_TOP using BDK config - settings instead of passed in variables. - """ - - _UNTOUCHABLE_ENV = ['BDK_PATH', 'PATH', 'ANDROID_PRODUCT_OUT', - 'ANDROID_BUILD_TOP'] - - def __init__(self, config, product_path, path): - super(BrunchToolWrapper, self).__init__(path) - - self._product_path = product_path - self._config = config - self._env_path = '' - if os.environ.has_key('PATH'): - self._env_path = os.environ['PATH'] - - self._import_environ() - self.environment.update({ - 'BDK_PATH': util.DEPRECATED_GetDefaultOSPath(), - 'PATH': os.pathsep.join([p for p - in [util.GetBDKPath('cli'), self._env_path] - if p]), - 'ANDROID_PRODUCT_OUT': os.path.join(self._product_path, 'out', - 'out-' + self._config.device, - 'target', 'product', - self._config.device), - 'ANDROID_BUILD_TOP': os.path.join(self._product_path, 'out', '.bdk') - }) - - def _import_environ(self): - """Walk the global environment merging in allowed variables.""" - extra_vars = self._config.bdk.allowed_environ - if extra_vars: - for var in extra_vars.split(' '): - if var in self._UNTOUCHABLE_ENV: - print ("Cannot passthrough environment variable '{0}' " - "as set in config/bdk/allowed_environ").format(var) - if os.environ.has_key(var): - self.environment[var] = os.environ[var] + """Legacy tool wrapper used by Brunch. + + This adds some additional functionality to the base ToolWrapper: + * Adds additional Brunch-specific environment variables. + * Sets PATH, ANDROID_PRODUCT_OUT, and ANDROID_BUILD_TOP using BDK config + settings instead of passed in variables. + """ + + _UNTOUCHABLE_ENV = ['BDK_PATH', 'PATH', 'ANDROID_PRODUCT_OUT', + 'ANDROID_BUILD_TOP'] + + def __init__(self, config, product_path, path): + super(BrunchToolWrapper, self).__init__(path) + + self._product_path = product_path + self._config = config + self._env_path = '' + if os.environ.has_key('PATH'): + self._env_path = os.environ['PATH'] + + self._import_environ() + self.environment.update({ + 'BDK_PATH': util.DEPRECATED_GetDefaultOSPath(), + 'PATH': os.pathsep.join([p for p + in [util.GetBDKPath('cli'), self._env_path] + if p]), + 'ANDROID_PRODUCT_OUT': os.path.join(self._product_path, 'out', + 'out-' + self._config.device, + 'target', 'product', + self._config.device), + 'ANDROID_BUILD_TOP': os.path.join(self._product_path, 'out', '.bdk') + }) + + def _import_environ(self): + """Walk the global environment merging in allowed variables.""" + extra_vars = self._config.bdk.allowed_environ + if extra_vars: + for var in extra_vars.split(' '): + if var in self._UNTOUCHABLE_ENV: + print ("Cannot passthrough environment variable '{0}' " + "as set in config/bdk/allowed_environ").format(var) + if os.environ.has_key(var): + self.environment[var] = os.environ[var] class BrunchHostToolWrapper(BrunchToolWrapper): - """Wraps a host tool for brunch workflows.""" + """Wraps a host tool for brunch workflows.""" - TOOL_PATH_FMT = os.path.join('{0}', 'out', 'out-{1}', - 'host', '{2}', 'bin', '{3}') - DEFAULT_ARCH = 'linux-x86' + TOOL_PATH_FMT = os.path.join('{0}', 'out', 'out-{1}', + 'host', '{2}', 'bin', '{3}') + DEFAULT_ARCH = 'linux-x86' - def __init__(self, config, product_path, name, arch=DEFAULT_ARCH): - self._tool_name = name - self._host_arch = arch - super(BrunchHostToolWrapper, self).__init__(config, product_path, name) - self._tool_path = self._build_path() + def __init__(self, config, product_path, name, arch=DEFAULT_ARCH): + self._tool_name = name + self._host_arch = arch + super(BrunchHostToolWrapper, self).__init__(config, product_path, name) + self._tool_path = self._build_path() - def _build_path(self): - return self.TOOL_PATH_FMT.format(self._product_path, self._config.device, - self._host_arch, self._tool_name) + def _build_path(self): + return self.TOOL_PATH_FMT.format(self._product_path, + self._config.device, self._host_arch, + self._tool_name) class BrunchTargetToolWrapper(BrunchHostToolWrapper): - """Wraps a target tool for brunch workflows.""" + """Wraps a target tool for brunch workflows.""" - TOOL_PATH_FMT = os.path.join('{0}', 'out', 'out-{1}', - 'target', 'product', '{2}', '{3}') + TOOL_PATH_FMT = os.path.join('{0}', 'out', 'out-{1}', + 'target', 'product', '{2}', '{3}') - def _build_path(self): - return self.TOOL_PATH_FMT.format(self._product_path, self._config.device, - self._config.device, self._tool_name) + def _build_path(self): + return self.TOOL_PATH_FMT.format(self._product_path, + self._config.device, + self._config.device, self._tool_name) diff --git a/cli/lib/core/tool_unittest.py b/cli/lib/core/tool_unittest.py index 2400cf5..4bca8f1 100644 --- a/cli/lib/core/tool_unittest.py +++ b/cli/lib/core/tool_unittest.py @@ -26,310 +26,307 @@ from test import stubs class ToolWrapperTest(unittest.TestCase): - """Tests for the ToolWrapper classes.""" - - def setUp(self): - self.stub_os = stubs.StubOs() - self.stub_subprocess = stubs.StubSubprocess() - self.stub_util = util_stub.StubUtil() - - tool.os = self.stub_os - tool.util = self.stub_util - tool.subprocess = self.stub_subprocess - - def test_run(self): - """Tests a basic tool run.""" - t = tool.ToolWrapper('/my/tool') - command = self.stub_subprocess.AddCommand(['/my/tool']) - - t.run() - command.AssertCallContained(['/my/tool']) - - def test_run_piped(self): - """Tests a basic piped tool run.""" - t = tool.ToolWrapper('/my/tool') - command = self.stub_subprocess.AddCommand(['/my/tool'], ret_out='out', - ret_err='err') - command2 = self.stub_subprocess.AddCommand(['/my/tool'], ret_out='out', - ret_err='err') - self.assertEqual(t.run(), (None, None)) - self.assertEqual(t.run(piped=True), ('out', 'err')) - - - def test_cwd(self): - """Tests changing the tool working directory.""" - t = tool.ToolWrapper('/my/tool') - t.set_cwd('/here') - command = self.stub_subprocess.AddCommand(['/my/tool']) - - t.run() - command.AssertCallContained(cwd='/here', env={'PWD': '/here'}) - - def test_fail(self): - """Tests a failed execution call.""" - def fail_subprocess_call(): - error = OSError() - error.strerror = 'blah' - raise error - - t = tool.ToolWrapper('/does/not/exist') - self.stub_subprocess.AddCommand(side_effect=fail_subprocess_call) - - with self.assertRaises(tool.ExecuteError) as error: - t.run() - self.assertIn('/does/not/exist', str(error.exception)) - self.assertIn('blah', str(error.exception)) - - def test_custom_env(self): - """Tests passing custom environment variables.""" - t = tool.ToolWrapper('/my/tool', env={'foo': '1'}) - t.environment['bar'] = '2' - command = self.stub_subprocess.AddCommand(['/my/tool']) - - t.run() - command.AssertCallContained(env={'foo': '1', 'bar': '2'}) - - def test_inherited_env(self): - """Tests inheriting environment variables.""" - self.stub_os.environ['USER'] = 'me' - self.stub_os.environ['PATH'] = '/should/not/inherit' - t = tool.ToolWrapper('/my/tool') - command = self.stub_subprocess.AddCommand(['/my/tool']) - - t.run() - command.AssertCallContained(env={'USER': 'me'}) - - def test_add_caller_env_path(self): - """Tests adding the caller's environment PATH manually.""" - # No tool or caller PATH. - t = tool.ToolWrapper('/my/tool') - t.add_caller_env_path() - self.assertNotIn('PATH', t.environment) - - # Tool PATH only. - t = tool.ToolWrapper('/my/tool', env={'PATH': '/foo'}) - t.add_caller_env_path() - self.assertEqual('/foo', t.environment['PATH']) - - # Caller PATH only. - self.stub_os.environ['PATH'] = '/bar' - t = tool.ToolWrapper('/my/tool') - t.add_caller_env_path() - self.assertEqual('/bar', t.environment['PATH']) - - # Both PATHs set. - self.stub_os.environ['PATH'] = '/bar' - t = tool.ToolWrapper('/my/tool', env={'PATH': '/foo'}) - t.add_caller_env_path() - self.assertEqual('/foo:/bar', t.environment['PATH']) - - def test_source_top(self): - """Tests setting source_top.""" - t = tool.ToolWrapper('/my/tool') - t.set_android_environment(source_top='/foo') - command = self.stub_subprocess.AddCommand(['/my/tool']) - - t.run() - command.AssertCallContained(env={'ANDROID_BUILD_TOP': '/foo'}) - - def test_build_out_and_bsp(self): - """Tests passing build_top and bsp to the tool.""" - self.stub_util.arch = 'test_arch' - t = tool.ToolWrapper('/my/tool') - t.set_android_environment(build_out='/build/out', bsp='bar') - command = self.stub_subprocess.AddCommand(['/my/tool']) - - t.run() - command.AssertCallContained(env={ - 'ANDROID_HOST_OUT': '/build/out/host/test_arch', - 'ANDROID_PRODUCT_OUT': '/build/out/target/product/bar' - }) - - def test_host_tool_wrapper(self): - """Tests HostToolWrapper construction.""" - self.stub_util.arch = 'test_arch' - t = tool.HostToolWrapper('tool', '/build/out') - self.stub_subprocess.AddCommand(['/build/out/host/test_arch/bin/tool']) - - t.run() - - def test_path_tool_wrapper(self): - self.stub_os.environ = {'foo': 'bar', 'baz': 'bip', 'PATH': '/usr/bin'} - t = tool.PathToolWrapper('tool') - command = self.stub_subprocess.AddCommand(['tool']) - t.run() - command.AssertCallContained(['tool'], env={'PATH': '/usr/bin'}) - - def test_provision_device_tool(self): - """Tests ProvisionDeviceTool construction.""" - self.stub_util.arch = 'test_arch' - t = tool.ProvisionDeviceTool(source_top='/source', build_out='/build/out', - bsp='board_name') - command = self.stub_subprocess.AddCommand() - - t.run() - command.AssertCallContained( - ['/build/out/target/product/board_name/provision-device'], - env={'ANDROID_BUILD_TOP': '/source', - 'ANDROID_HOST_OUT': '/build/out/host/test_arch', - 'ANDROID_PRODUCT_OUT': '/build/out/target/product/board_name'}) + """Tests for the ToolWrapper classes.""" + + def setUp(self): + self.stub_os = stubs.StubOs() + self.stub_subprocess = stubs.StubSubprocess() + self.stub_util = util_stub.StubUtil() + + tool.os = self.stub_os + tool.util = self.stub_util + tool.subprocess = self.stub_subprocess + + def test_run(self): + """Tests a basic tool run.""" + t = tool.ToolWrapper('/my/tool') + command = self.stub_subprocess.AddCommand(['/my/tool']) + + t.run() + command.AssertCallContained(['/my/tool']) + + def test_run_piped(self): + """Tests a basic piped tool run.""" + t = tool.ToolWrapper('/my/tool') + self.stub_subprocess.AddCommand(['/my/tool'], ret_out='out', + ret_err='err') + self.stub_subprocess.AddCommand(['/my/tool'], ret_out='out', + ret_err='err') + self.assertEqual(t.run(), (None, None)) + self.assertEqual(t.run(piped=True), ('out', 'err')) + + + def test_cwd(self): + """Tests changing the tool working directory.""" + t = tool.ToolWrapper('/my/tool') + t.set_cwd('/here') + command = self.stub_subprocess.AddCommand(['/my/tool']) + + t.run() + command.AssertCallContained(cwd='/here', env={'PWD': '/here'}) + + def test_fail(self): + """Tests a failed execution call.""" + def fail_subprocess_call(): + error = OSError() + error.strerror = 'blah' + raise error + + t = tool.ToolWrapper('/does/not/exist') + self.stub_subprocess.AddCommand(side_effect=fail_subprocess_call) + + with self.assertRaises(tool.ExecuteError) as error: + t.run() + self.assertIn('/does/not/exist', str(error.exception)) + self.assertIn('blah', str(error.exception)) + + def test_custom_env(self): + """Tests passing custom environment variables.""" + t = tool.ToolWrapper('/my/tool', env={'foo': '1'}) + t.environment['bar'] = '2' + command = self.stub_subprocess.AddCommand(['/my/tool']) + + t.run() + command.AssertCallContained(env={'foo': '1', 'bar': '2'}) + + def test_inherited_env(self): + """Tests inheriting environment variables.""" + self.stub_os.environ['USER'] = 'me' + self.stub_os.environ['PATH'] = '/should/not/inherit' + t = tool.ToolWrapper('/my/tool') + command = self.stub_subprocess.AddCommand(['/my/tool']) + + t.run() + command.AssertCallContained(env={'USER': 'me'}) + + def test_add_caller_env_path(self): + """Tests adding the caller's environment PATH manually.""" + # No tool or caller PATH. + t = tool.ToolWrapper('/my/tool') + t.add_caller_env_path() + self.assertNotIn('PATH', t.environment) + + # Tool PATH only. + t = tool.ToolWrapper('/my/tool', env={'PATH': '/foo'}) + t.add_caller_env_path() + self.assertEqual('/foo', t.environment['PATH']) + + # Caller PATH only. + self.stub_os.environ['PATH'] = '/bar' + t = tool.ToolWrapper('/my/tool') + t.add_caller_env_path() + self.assertEqual('/bar', t.environment['PATH']) + + # Both PATHs set. + self.stub_os.environ['PATH'] = '/bar' + t = tool.ToolWrapper('/my/tool', env={'PATH': '/foo'}) + t.add_caller_env_path() + self.assertEqual('/foo:/bar', t.environment['PATH']) + + def test_source_top(self): + """Tests setting source_top.""" + t = tool.ToolWrapper('/my/tool') + t.set_android_environment(source_top='/foo') + command = self.stub_subprocess.AddCommand(['/my/tool']) + + t.run() + command.AssertCallContained(env={'ANDROID_BUILD_TOP': '/foo'}) + + def test_build_out_and_bsp(self): + """Tests passing build_top and bsp to the tool.""" + self.stub_util.arch = 'test_arch' + t = tool.ToolWrapper('/my/tool') + t.set_android_environment(build_out='/build/out', bsp='bar') + command = self.stub_subprocess.AddCommand(['/my/tool']) + + t.run() + command.AssertCallContained(env={ + 'ANDROID_HOST_OUT': '/build/out/host/test_arch', + 'ANDROID_PRODUCT_OUT': '/build/out/target/product/bar' + }) + + def test_host_tool_wrapper(self): + """Tests HostToolWrapper construction.""" + self.stub_util.arch = 'test_arch' + t = tool.HostToolWrapper('tool', '/build/out') + self.stub_subprocess.AddCommand(['/build/out/host/test_arch/bin/tool']) + + t.run() + + def test_path_tool_wrapper(self): + self.stub_os.environ = {'foo': 'bar', 'baz': 'bip', 'PATH': '/usr/bin'} + t = tool.PathToolWrapper('tool') + command = self.stub_subprocess.AddCommand(['tool']) + t.run() + command.AssertCallContained(['tool'], env={'PATH': '/usr/bin'}) + + def test_provision_device_tool(self): + """Tests ProvisionDeviceTool construction.""" + self.stub_util.arch = 'test_arch' + t = tool.ProvisionDeviceTool(source_top='/source', + build_out='/build/out', bsp='board_name') + command = self.stub_subprocess.AddCommand() + + t.run() + command.AssertCallContained( + ['/build/out/target/product/board_name/provision-device'], + env={'ANDROID_BUILD_TOP': '/source', + 'ANDROID_HOST_OUT': '/build/out/host/test_arch', + 'ANDROID_PRODUCT_OUT': '/build/out/target/product/board_name'}) class BrunchToolWrapperTest(unittest.TestCase): - def setUp(self): - self.stub_subprocess = stubs.StubSubprocess() - self.stub_util = util_stub.StubUtil() + def setUp(self): + self.stub_subprocess = stubs.StubSubprocess() + self.stub_util = util_stub.StubUtil() - tool.util = self.stub_util - tool.os = stubs.StubOs() - tool.subprocess = self.stub_subprocess + tool.util = self.stub_util + tool.os = stubs.StubOs() + tool.subprocess = self.stub_subprocess - self.saved_props = (config.DictStore.REQUIRED_PROPS, - config.DictStore.OPTIONAL_PROPS) - config.DictStore.REQUIRED_PROPS = config.ProductFileStore.REQUIRED_PROPS - config.DictStore.OPTIONAL_PROPS = config.ProductFileStore.OPTIONAL_PROPS - self.store = config.DictStore() - self.store.device = 'somedevice' + self.saved_props = (config.DictStore.REQUIRED_PROPS, + config.DictStore.OPTIONAL_PROPS) + config.DictStore.REQUIRED_PROPS = config.ProductFileStore.REQUIRED_PROPS + config.DictStore.OPTIONAL_PROPS = config.ProductFileStore.OPTIONAL_PROPS + self.store = config.DictStore() + self.store.device = 'somedevice' - self.product_path = 'product_path' + self.product_path = 'product_path' - def tearDown(self): - config.DictStore.REQUIRED_PROPS = self.saved_props[0] - config.DictStore.OPTIONAL_PROPS = self.saved_props[1] + def tearDown(self): + config.DictStore.REQUIRED_PROPS = self.saved_props[0] + config.DictStore.OPTIONAL_PROPS = self.saved_props[1] class TestBrunchToolRun(BrunchToolWrapperTest): - def test_base(self): - t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool') - t.environment.clear() - t.environment['PATH'] = 'abc:123' - t.environment['BAZ'] = 'B "A" R' + def test_base(self): + t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool') + t.environment.clear() + t.environment['PATH'] = 'abc:123' + t.environment['BAZ'] = 'B "A" R' - command = self.stub_subprocess.AddCommand(ret_code=0) - t.run(['arg1']) - command.AssertCallWas(['/my/tool', 'arg1'], shell=False, cwd=None, - stdout=None, - stderr=None, - env={'PATH': 'abc:123', 'BAZ': 'B "A" R'}) + command = self.stub_subprocess.AddCommand(ret_code=0) + t.run(['arg1']) + command.AssertCallWas(['/my/tool', 'arg1'], shell=False, cwd=None, + stdout=None, stderr=None, + env={'PATH': 'abc:123', 'BAZ': 'B "A" R'}) - command = self.stub_subprocess.AddCommand(ret_code=1) - with self.assertRaises(tool.ReturnError) as e: - t.run(['arg1']) - self.assertEqual(1, e.exception.errno) - command.AssertCallWas(['/my/tool', 'arg1'], shell=False, cwd=None, - stdout=None, - stderr=None, - env={'PATH': 'abc:123', 'BAZ': 'B "A" R'}) + command = self.stub_subprocess.AddCommand(ret_code=1) + with self.assertRaises(tool.ReturnError) as e: + t.run(['arg1']) + self.assertEqual(1, e.exception.errno) + command.AssertCallWas(['/my/tool', 'arg1'], shell=False, cwd=None, + stdout=None, stderr=None, + env={'PATH': 'abc:123', 'BAZ': 'B "A" R'}) - def test_sigabrt(self): - t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool') - self.stub_subprocess.AddCommand(['/my/tool', 'arg1'], ret_code=-6) + def test_sigabrt(self): + t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool') + self.stub_subprocess.AddCommand(['/my/tool', 'arg1'], ret_code=-6) - with self.assertRaises(tool.ReturnError) as e: - t.run(['arg1']) - self.assertEqual(128 + 6, e.exception.errno) + with self.assertRaises(tool.ReturnError) as e: + t.run(['arg1']) + self.assertEqual(128 + 6, e.exception.errno) - def test_cwd(self): - t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool') - t.set_cwd('/here') - command = self.stub_subprocess.AddCommand(['/my/tool']) + def test_cwd(self): + t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool') + t.set_cwd('/here') + command = self.stub_subprocess.AddCommand(['/my/tool']) - t.run(['arg1']) - command.AssertCallContained(cwd='/here') + t.run(['arg1']) + command.AssertCallContained(cwd='/here') class TestBrunchToolEnvironment(BrunchToolWrapperTest): - def test_baseline(self): - t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool') - t.environment.clear() - t.environment['PATH'] = 'abc:123' - t.environment['BAZ'] = 'B "A" R' - command = self.stub_subprocess.AddCommand() - - t.run(['arg1']) - command.AssertCallWas(['/my/tool', 'arg1'], shell=False, cwd=None, - stdout=None, - stderr=None, - env={'PATH': 'abc:123', 'BAZ': 'B "A" R'}) - - def test_no_path(self): - """Ensures no exceptions if there is no PATH.""" - t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool') - command = self.stub_subprocess.AddCommand(['/my/tool', 'arg1']) - - t.run(['arg1']) - self.assertEqual(self.stub_util.GetBDKPath('cli'), - command.GetCallArgs()[1]['env']['PATH']) - - def test_untouchable(self): - """Tests config/bdk/allowed_environ limits.""" - self.store.bdk.allowed_environ = 'BDK_PATH' - tool.os.environ['BDK_PATH'] = '/path/to/crazy/town' - - t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool') - command = self.stub_subprocess.AddCommand(['/my/tool', 'arg1']) - - t.run(['arg1']) - self.assertEqual(self.stub_util.DEPRECATED_GetDefaultOSPath(), - command.GetCallArgs()[1]['env']['BDK_PATH']) - - def test_passthrough(self): - """Ensures config/bdk/allowed_environ is respected.""" - self.store.bdk.allowed_environ = 'FUZZY WUZZY' - tool.os.environ['FUZZY'] = 'WUZ' - tool.os.environ['WUZZY'] = 'A BEAR' - - t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool') - command = self.stub_subprocess.AddCommand(['/my/tool', 'arg1']) - - t.run(['arg1']) - call_env = command.GetCallArgs()[1]['env'] - self.assertEqual(call_env['FUZZY'], 'WUZ') - self.assertEqual(call_env['WUZZY'], 'A BEAR') - self.assertEqual(call_env['BDK_PATH'], - self.stub_util.DEPRECATED_GetDefaultOSPath()) - - def test_product_out(self): - """Ensures ANDROID_PRODUCT_OUT meets the contract from adb.""" - t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool') - command = self.stub_subprocess.AddCommand(['/my/tool', 'arg1']) - - t.run(['arg1']) - self.assertEqual( - 'product_path/out/out-somedevice/target/product/somedevice', - command.GetCallArgs()[1]['env']['ANDROID_PRODUCT_OUT']) + def test_baseline(self): + t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool') + t.environment.clear() + t.environment['PATH'] = 'abc:123' + t.environment['BAZ'] = 'B "A" R' + command = self.stub_subprocess.AddCommand() + + t.run(['arg1']) + command.AssertCallWas(['/my/tool', 'arg1'], shell=False, cwd=None, + stdout=None, stderr=None, + env={'PATH': 'abc:123', 'BAZ': 'B "A" R'}) + + def test_no_path(self): + """Ensures no exceptions if there is no PATH.""" + t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool') + command = self.stub_subprocess.AddCommand(['/my/tool', 'arg1']) + + t.run(['arg1']) + self.assertEqual(self.stub_util.GetBDKPath('cli'), + command.GetCallArgs()[1]['env']['PATH']) + + def test_untouchable(self): + """Tests config/bdk/allowed_environ limits.""" + self.store.bdk.allowed_environ = 'BDK_PATH' + tool.os.environ['BDK_PATH'] = '/path/to/crazy/town' + + t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool') + command = self.stub_subprocess.AddCommand(['/my/tool', 'arg1']) + + t.run(['arg1']) + self.assertEqual(self.stub_util.DEPRECATED_GetDefaultOSPath(), + command.GetCallArgs()[1]['env']['BDK_PATH']) + + def test_passthrough(self): + """Ensures config/bdk/allowed_environ is respected.""" + self.store.bdk.allowed_environ = 'FUZZY WUZZY' + tool.os.environ['FUZZY'] = 'WUZ' + tool.os.environ['WUZZY'] = 'A BEAR' + + t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool') + command = self.stub_subprocess.AddCommand(['/my/tool', 'arg1']) + + t.run(['arg1']) + call_env = command.GetCallArgs()[1]['env'] + self.assertEqual(call_env['FUZZY'], 'WUZ') + self.assertEqual(call_env['WUZZY'], 'A BEAR') + self.assertEqual(call_env['BDK_PATH'], + self.stub_util.DEPRECATED_GetDefaultOSPath()) + + def test_product_out(self): + """Ensures ANDROID_PRODUCT_OUT meets the contract from adb.""" + t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool') + command = self.stub_subprocess.AddCommand(['/my/tool', 'arg1']) + + t.run(['arg1']) + self.assertEqual( + 'product_path/out/out-somedevice/target/product/somedevice', + command.GetCallArgs()[1]['env']['ANDROID_PRODUCT_OUT']) class TestBrunchHostToolWrapper(BrunchToolWrapperTest): - def test_path(self): - t = tool.BrunchHostToolWrapper(self.store, self.product_path, 'tool') - self.assertEqual( - 'product_path/out/out-somedevice/host/linux-x86/bin/tool', - t.path()) + def test_path(self): + t = tool.BrunchHostToolWrapper(self.store, self.product_path, 'tool') + self.assertEqual( + 'product_path/out/out-somedevice/host/linux-x86/bin/tool', + t.path()) - t = tool.BrunchHostToolWrapper(self.store, self.product_path, 'fastboot', - arch='win-mips') - self.assertEqual( - 'product_path/out/out-somedevice/host/win-mips/bin/fastboot', - t.path()) + t = tool.BrunchHostToolWrapper(self.store, self.product_path, + 'fastboot', arch='win-mips') + self.assertEqual( + 'product_path/out/out-somedevice/host/win-mips/bin/fastboot', + t.path()) class TestBrunchTargetToolWrapper(BrunchToolWrapperTest): - def test_path(self): - t = tool.BrunchTargetToolWrapper(self.store, self.product_path, 'tool') - self.assertEqual( - 'product_path/out/out-somedevice/target/product/somedevice/tool', - t.path()) - - t = tool.BrunchTargetToolWrapper(self.store, self.product_path, - 'provision-dev') - self.assertEqual( - ('product_path/out/out-somedevice/target/product/somedevice/' - 'provision-dev'), - t.path()) + def test_path(self): + t = tool.BrunchTargetToolWrapper(self.store, self.product_path, 'tool') + self.assertEqual( + 'product_path/out/out-somedevice/target/product/somedevice/tool', + t.path()) + + t = tool.BrunchTargetToolWrapper(self.store, self.product_path, + 'provision-dev') + self.assertEqual( + ('product_path/out/out-somedevice/target/product/somedevice/' + 'provision-dev'), + t.path()) diff --git a/cli/lib/core/user_config.py b/cli/lib/core/user_config.py index b6a7398..768c63d 100644 --- a/cli/lib/core/user_config.py +++ b/cli/lib/core/user_config.py @@ -16,7 +16,6 @@ """A class storing user configuration.""" -import os import uuid from core import config @@ -34,66 +33,67 @@ PATHS = [BSP_DIR, PLATFORM_CACHE] class UserConfig(config.Store): - REQUIRED_PROPS = {METRICS_OPT_IN:['0', '1'], UID:[]} - OPTIONAL_PROPS = {BSP_DIR:[], PLATFORM_CACHE:[]} - PREFIX = 'user_' - - def __init__(self, file_path='', table='user'): - file_path = file_path or util.GetUserDataPath('config.db') - super(UserConfig, self).__init__(file_path, table) - - self._defaults = { - BSP_DIR: util.GetBDKPath('BSPs'), - PLATFORM_CACHE: util.GetBDKPath('platform_cache') - } - - - def initialize(self, opt_in=None): - """Sets up the user store. - - Prompts the user for information when necessary. - - Args: - opt_in: (optional) If provided, skip prompting and set - metrics_opt_in to the provided value. Fixes True to '1' - and False to '0'. - """ - # Fix booleans to strings. - if opt_in == False: - opt_in = '0' - elif opt_in == True: - opt_in = '1' - - # Check for opt in - print ('To help improve the quality of this product, we collect\n' - 'anonymized data on how the BDK is used. You may choose to opt out\n' - 'of this collection now (by choosing "N" at the below prompt),\n' - 'or at any point in the future by running the following command:\n' - ' bdk config set metrics_opt_in 0') - while opt_in is None: - choice = util.GetUserInput( 'Do you want to help improve the ' - 'Project Brillo BDK?\n' - '(Y/n) ').strip().upper() - if not choice or choice == 'Y': - opt_in = '1' - elif choice == 'N': - opt_in = '0' - - # pylint: disable=attribute-defined-outside-init - self.metrics_opt_in = opt_in - - self.uid = str(uuid.uuid4()) - - def _save(self, key, val): - if not val and key in self._defaults: - val = self._defaults[key] - return super(UserConfig, self)._save(key, val) - - def _load(self, key): - result = super(UserConfig, self)._load(key) - if not result and key in self._defaults: - result = self._defaults[key] - return result + REQUIRED_PROPS = {METRICS_OPT_IN:['0', '1'], UID:[]} + OPTIONAL_PROPS = {BSP_DIR:[], PLATFORM_CACHE:[]} + PREFIX = 'user_' + + def __init__(self, file_path='', table='user'): + file_path = file_path or util.GetUserDataPath('config.db') + super(UserConfig, self).__init__(file_path, table) + + self._defaults = { + BSP_DIR: util.GetBDKPath('BSPs'), + PLATFORM_CACHE: util.GetBDKPath('platform_cache') + } + + + def initialize(self, opt_in=None): + """Sets up the user store. + + Prompts the user for information when necessary. + + Args: + opt_in: (optional) If provided, skip prompting and set + metrics_opt_in to the provided value. Fixes True to '1' + and False to '0'. + """ + # Fix booleans to strings. + if opt_in == False: + opt_in = '0' + elif opt_in == True: + opt_in = '1' + + # Check for opt in + print ('To help improve the quality of this product, we collect\n' + 'anonymized data on how the BDK is used. You may choose to opt\n' + 'out of this collection now (by choosing "N" at the below\n' + 'prompt), or at any point in the future by running the\n' + 'following command:\n' + ' bdk config set metrics_opt_in 0') + while opt_in is None: + choice = util.GetUserInput('Do you want to help improve the ' + 'Project Brillo BDK?\n' + '(Y/n) ').strip().upper() + if not choice or choice == 'Y': + opt_in = '1' + elif choice == 'N': + opt_in = '0' + + # pylint: disable=attribute-defined-outside-init + self.metrics_opt_in = opt_in + + self.uid = str(uuid.uuid4()) + + def _save(self, key, val): + if not val and key in self._defaults: + val = self._defaults[key] + return super(UserConfig, self)._save(key, val) + + def _load(self, key): + result = super(UserConfig, self)._load(key) + if not result and key in self._defaults: + result = self._defaults[key] + return result # A default global UserConfig. USER_CONFIG = UserConfig() diff --git a/cli/lib/core/user_config_stub.py b/cli/lib/core/user_config_stub.py index 8b2483b..47fcace 100644 --- a/cli/lib/core/user_config_stub.py +++ b/cli/lib/core/user_config_stub.py @@ -17,26 +17,21 @@ """core.user_config module stubs.""" -import collections -import os - from core import user_config -from core import util -import error class StubUserConfig(object): - """Stubs for our core.user_config module.""" + """Stubs for our core.user_config module.""" - EXPOSED = user_config.EXPOSED + EXPOSED = user_config.EXPOSED - class UserConfig(object): - def __init__(self, file_path='user_data.db', table='totally_private'): - self.file_path = file_path - self.table = table - self.metrics_opt_in = '0' - self.uid = 'user_id' - self.bsp_dir = 'somewhere/bsps' - self.platform_cache = 'elsewhere/pc' + class UserConfig(object): + def __init__(self, file_path='user_data.db', table='totally_private'): + self.file_path = file_path + self.table = table + self.metrics_opt_in = '0' + self.uid = 'user_id' + self.bsp_dir = 'somewhere/bsps' + self.platform_cache = 'elsewhere/pc' - USER_CONFIG = UserConfig() + USER_CONFIG = UserConfig() diff --git a/cli/lib/core/user_config_unittest.py b/cli/lib/core/user_config_unittest.py index 5b85697..a1fee76 100644 --- a/cli/lib/core/user_config_unittest.py +++ b/cli/lib/core/user_config_unittest.py @@ -26,85 +26,86 @@ from test import stubs class UserConfigTest(unittest.TestCase): - def setUp(self): - self.stub_os = stubs.StubOs() - self.stub_util = util_stub.StubUtil() - self.stub_uuid = stubs.StubUuid() - - user_config.os = self.stub_os - user_config.util = self.stub_util - user_config.uuid = self.stub_uuid - - self.user_config = user_config.UserConfig(file_path=':memory:') - - def test_default_config(self): - uc = user_config.UserConfig() - # Default user config should be in user data dir. - self.assertTrue(uc._path.startswith(self.stub_util.user_data_path)) - - def test_initialize_opt_in(self): - self.user_config.initialize(opt_in=True) - self.assertEqual(self.user_config.metrics_opt_in, '1') - self.assertEqual(self.user_config.uid, self.stub_uuid.generate) - - def test_initialize_opt_out(self): - self.user_config.initialize(opt_in=False) - self.assertEqual(self.user_config.metrics_opt_in, '0') - self.assertEqual(self.user_config.uid, self.stub_uuid.generate) - - def test_opt_in_values(self): - self.user_config.metrics_opt_in = '0' - self.assertEqual(self.user_config.metrics_opt_in, '0') - self.user_config.metrics_opt_in = '1' - self.assertEqual(self.user_config.metrics_opt_in, '1') - with self.assertRaises(ValueError): - self.user_config.metrics_opt_in = '2' - with self.assertRaises(ValueError): - self.user_config.metrics_opt_in = '' - with self.assertRaises(ValueError): - self.user_config.metrics_opt_in = '-1' - with self.assertRaises(ValueError): - self.user_config.metrics_opt_in = 'not_a_number' - with self.assertRaises(ValueError): - self.user_config.metrics_opt_in = None - # Must be given in string form. - with self.assertRaises(ValueError): - self.user_config.metrics_opt_in = 1 - with self.assertRaises(ValueError): - self.user_config.metrics_opt_in = 0 - - def test_defaults(self): - self.assertIsNone(self.user_config.metrics_opt_in) - self.assertIsNone(self.user_config.uid) - self.assertEqual(self.user_config.bsp_dir, - self.stub_util.GetBDKPath('BSPs')) - self.assertEqual(self.user_config.platform_cache, - self.stub_util.GetBDKPath('platform_cache')) - - # Set values (except for metrics_opt_in, tested above). - self.user_config.uid = 'uid!' - self.assertEqual(self.user_config.uid, 'uid!') - self.user_config.bsp_dir = 'bsp_dir!' - self.assertEqual(self.user_config.bsp_dir, 'bsp_dir!') - self.user_config.platform_cache = 'platform_cache!' - self.assertEqual(self.user_config.platform_cache, 'platform_cache!') - - # Reset values. - self.user_config.uid = None - self.assertIsNone(self.user_config.uid) - self.user_config.bsp_dir = None - self.assertEqual(self.user_config.bsp_dir, - self.stub_util.GetBDKPath('BSPs')) - self.user_config.platform_cache = None - self.assertEqual(self.user_config.platform_cache, - self.stub_util.GetBDKPath('platform_cache')) - - # Empty string should behave like default for those with defaults. - self.user_config.uid = '' - self.assertEqual(self.user_config.uid, '') - self.user_config.bsp_dir = '' - self.assertEqual(self.user_config.bsp_dir, - self.stub_util.GetBDKPath('BSPs')) - self.user_config.platform_cache = '' - self.assertEqual(self.user_config.platform_cache, - self.stub_util.GetBDKPath('platform_cache')) + def setUp(self): + self.stub_os = stubs.StubOs() + self.stub_util = util_stub.StubUtil() + self.stub_uuid = stubs.StubUuid() + + user_config.os = self.stub_os + user_config.util = self.stub_util + user_config.uuid = self.stub_uuid + + self.user_config = user_config.UserConfig(file_path=':memory:') + + def test_default_config(self): + uc = user_config.UserConfig() + # Default user config should be in user data dir. + # pylint: disable=protected-access + self.assertTrue(uc._path.startswith(self.stub_util.user_data_path)) + + def test_initialize_opt_in(self): + self.user_config.initialize(opt_in=True) + self.assertEqual(self.user_config.metrics_opt_in, '1') + self.assertEqual(self.user_config.uid, self.stub_uuid.generate) + + def test_initialize_opt_out(self): + self.user_config.initialize(opt_in=False) + self.assertEqual(self.user_config.metrics_opt_in, '0') + self.assertEqual(self.user_config.uid, self.stub_uuid.generate) + + def test_opt_in_values(self): + self.user_config.metrics_opt_in = '0' + self.assertEqual(self.user_config.metrics_opt_in, '0') + self.user_config.metrics_opt_in = '1' + self.assertEqual(self.user_config.metrics_opt_in, '1') + with self.assertRaises(ValueError): + self.user_config.metrics_opt_in = '2' + with self.assertRaises(ValueError): + self.user_config.metrics_opt_in = '' + with self.assertRaises(ValueError): + self.user_config.metrics_opt_in = '-1' + with self.assertRaises(ValueError): + self.user_config.metrics_opt_in = 'not_a_number' + with self.assertRaises(ValueError): + self.user_config.metrics_opt_in = None + # Must be given in string form. + with self.assertRaises(ValueError): + self.user_config.metrics_opt_in = 1 + with self.assertRaises(ValueError): + self.user_config.metrics_opt_in = 0 + + def test_defaults(self): + self.assertIsNone(self.user_config.metrics_opt_in) + self.assertIsNone(self.user_config.uid) + self.assertEqual(self.user_config.bsp_dir, + self.stub_util.GetBDKPath('BSPs')) + self.assertEqual(self.user_config.platform_cache, + self.stub_util.GetBDKPath('platform_cache')) + + # Set values (except for metrics_opt_in, tested above). + self.user_config.uid = 'uid!' + self.assertEqual(self.user_config.uid, 'uid!') + self.user_config.bsp_dir = 'bsp_dir!' + self.assertEqual(self.user_config.bsp_dir, 'bsp_dir!') + self.user_config.platform_cache = 'platform_cache!' + self.assertEqual(self.user_config.platform_cache, 'platform_cache!') + + # Reset values. + self.user_config.uid = None + self.assertIsNone(self.user_config.uid) + self.user_config.bsp_dir = None + self.assertEqual(self.user_config.bsp_dir, + self.stub_util.GetBDKPath('BSPs')) + self.user_config.platform_cache = None + self.assertEqual(self.user_config.platform_cache, + self.stub_util.GetBDKPath('platform_cache')) + + # Empty string should behave like default for those with defaults. + self.user_config.uid = '' + self.assertEqual(self.user_config.uid, '') + self.user_config.bsp_dir = '' + self.assertEqual(self.user_config.bsp_dir, + self.stub_util.GetBDKPath('BSPs')) + self.user_config.platform_cache = '' + self.assertEqual(self.user_config.platform_cache, + self.stub_util.GetBDKPath('platform_cache')) diff --git a/cli/lib/core/util.py b/cli/lib/core/util.py index d66e2da..a9ecd5c 100644 --- a/cli/lib/core/util.py +++ b/cli/lib/core/util.py @@ -25,19 +25,19 @@ import error class Error(error.Error): - """Base error for util module.""" + """Base error for util module.""" class HostUnsupportedArchError(Error): - """Raised when run on an unsupported host architecture.""" + """Raised when run on an unsupported host architecture.""" class PathError(Error): - """Raised when an expected path can't be found.""" + """Raised when an expected path can't be found.""" class OSVersionError(Error): - """Raised when an invalid OS version is requested.""" + """Raised when an invalid OS version is requested.""" ANDROID_PRODUCTS_MK = 'AndroidProducts.mk' @@ -48,136 +48,136 @@ PLATFORM_CACHE_FORMAT = os.path.join('{user_configured_platform_cache}', def GetBDKPath(*relpath_args): - """Find a path relative to the base of the BDK.""" - # We know this file is located in <bdk>/cli/lib/core/. - core_path = os.path.dirname(__file__) - return os.path.abspath(os.path.join(core_path, '..', '..', '..', - *relpath_args)) + """Find a path relative to the base of the BDK.""" + # We know this file is located in <bdk>/cli/lib/core/. + core_path = os.path.dirname(__file__) + return os.path.abspath(os.path.join(core_path, '..', '..', '..', + *relpath_args)) def GetUserDataPath(*relpath_args): - """Find a path relative to the user data dir.""" - # User data is stored at <bdk>/user_data. - return GetBDKPath('user_data', *relpath_args) + """Find a path relative to the user data dir.""" + # User data is stored at <bdk>/user_data. + return GetBDKPath('user_data', *relpath_args) def GetBDKVersion(): - """Find the BDK version""" - # Cache the version. - # TODO(wad)(b/25952145) make this less hacky. - if vars().get('bdk_version') is not None: - return vars().get('bdk_version') - version_path = GetBDKPath('VERSION') - version = 0 # Default version. - with open(version_path, 'r') as f: - version = f.readline().strip() - vars()['bdk_version'] = version - return version + """Find the BDK version""" + # Cache the version. + # TODO(wad)(b/25952145) make this less hacky. + if vars().get('bdk_version') is not None: + return vars().get('bdk_version') + version_path = GetBDKPath('VERSION') + version = 0 # Default version. + with open(version_path, 'r') as f: + version = f.readline().strip() + vars()['bdk_version'] = version + return version def GetOSVersion(): - """Find the OS version""" - # TODO(b/27653682): Separate versions for OS & BDK. - # Note: this function will be removed once OSes are downloaded like - # BSPs. At that point, code should check the manifest for available - # versions. - return GetBDKVersion() + """Find the OS version""" + # TODO(b/27653682): Separate versions for OS & BDK. + # Note: this function will be removed once OSes are downloaded like + # BSPs. At that point, code should check the manifest for available + # versions. + return GetBDKVersion() def GetOSPath(os_version, *relpath_args): - """Find a path relative to the base of an OS. + """Find a path relative to the base of an OS. - Raises: - OSVersionError: The designated OS is not installed. - """ - if os_version != GetOSVersion(): - raise OSVersionError('Brillo {} is not installed ' - '(installed version is: {}, the BDK does not yet ' - 'support installing additional versions).'.format( - os_version, GetOSVersion())) - # For now, the BDK is actually under the OS at <os>/tools/bdk, - # so we can figure out where the OS is based on that. - return GetBDKPath('..', '..', *relpath_args) + Raises: + OSVersionError: The designated OS is not installed. + """ + if os_version != GetOSVersion(): + raise OSVersionError('Brillo {} is not installed ' + '(installed version is: {}, the BDK does not yet ' + 'support installing additional versions).'.format( + os_version, GetOSVersion())) + # For now, the BDK is actually under the OS at <os>/tools/bdk, + # so we can figure out where the OS is based on that. + return GetBDKPath('..', '..', *relpath_args) def DEPRECATED_GetDefaultOSPath(*relpath): - """DEPRECATED - use GetOSPath. + """DEPRECATED - use GetOSPath. - For use by legacy brunch commands without a target os version. - """ - return GetOSPath(GetOSVersion(), *relpath) + For use by legacy brunch commands without a target os version. + """ + return GetOSPath(GetOSVersion(), *relpath) def GetProductDir(): - """Walks from cwd upward to find the product root""" - p = os.getcwd() - while p != '/': - if os.path.isfile(os.path.join(p, ANDROID_PRODUCTS_MK)): - return p - p = os.path.dirname(p) - return None + """Walks from cwd upward to find the product root""" + p = os.getcwd() + while p != '/': + if os.path.isfile(os.path.join(p, ANDROID_PRODUCTS_MK)): + return p + p = os.path.dirname(p) + return None def GetAndroidProductOut(platform_build_out, board): - """Returns the ANDROID_PRODUCT_OUT directory. + """Returns the ANDROID_PRODUCT_OUT directory. - Args: - platform_build_out: root directory of the platform build output. - board: board name. + Args: + platform_build_out: root directory of the platform build output. + board: board name. - Returns: - Path to the product build output directory. - """ - return os.path.join(platform_build_out, 'target', 'product', board) + Returns: + Path to the product build output directory. + """ + return os.path.join(platform_build_out, 'target', 'product', board) def FindProjectSpec(): - """Walks from cwd upward to find the project spec.""" - p = os.getcwd() - while p != '/': - f = os.path.join(p, PROJECT_SPEC_FILENAME) - if os.path.isfile(f): - return f - p = os.path.dirname(p) - return None + """Walks from cwd upward to find the project spec.""" + p = os.getcwd() + while p != '/': + f = os.path.join(p, PROJECT_SPEC_FILENAME) + if os.path.isfile(f): + return f + p = os.path.dirname(p) + return None def AsShellArgs(args): - # TODO(wad)(b/25952900) we should sanitize beyond just quotes. - return ' '.join(['"%s"' % a.replace('"', '\\"') for a in args]) + # TODO(wad)(b/25952900) we should sanitize beyond just quotes. + return ' '.join(['"%s"' % a.replace('"', '\\"') for a in args]) def GetExitCode(status): - """Convert an os.wait status code to a shell return value""" - if os.WIFEXITED(status): - return os.WEXITSTATUS(status) - if os.WIFSTOPPED(status): - return 128+os.WSTOPSIG(status) - if os.WIFSIGNALED(status): - return 128+os.WTERMSIG(status) - return 1 + """Convert an os.wait status code to a shell return value""" + if os.WIFEXITED(status): + return os.WEXITSTATUS(status) + if os.WIFSTOPPED(status): + return 128+os.WSTOPSIG(status) + if os.WIFSIGNALED(status): + return 128+os.WTERMSIG(status) + return 1 def GetUserInput(prompt): - """Prompt a user for input and return the result. + """Prompt a user for input and return the result. - Also pauses all known running timers, so that their - time doesn't include user thinking time. - """ - print prompt - sys.stdout.flush() - timer.Timer.MassPauseRunningTimers() - result = sys.stdin.readline() - timer.Timer.ResumeMassPausedTimers() - return result + Also pauses all known running timers, so that their + time doesn't include user thinking time. + """ + print prompt + sys.stdout.flush() + timer.Timer.MassPauseRunningTimers() + result = sys.stdin.readline() + timer.Timer.ResumeMassPausedTimers() + return result def GetHostArch(): - """Returns the Android name of the host architecture. - - Raises: - HostError: host architecture is not supported. - """ - if os.uname()[0].lower() != 'linux': - raise HostUnsupportedArchError('The BDK only supports Linux machines.') - return 'linux-x86' + """Returns the Android name of the host architecture. + + Raises: + HostError: host architecture is not supported. + """ + if os.uname()[0].lower() != 'linux': + raise HostUnsupportedArchError('The BDK only supports Linux machines.') + return 'linux-x86' diff --git a/cli/lib/core/util_stub.py b/cli/lib/core/util_stub.py index ff15be2..45bd773 100644 --- a/cli/lib/core/util_stub.py +++ b/cli/lib/core/util_stub.py @@ -24,72 +24,73 @@ import error class StubUtil(object): - """Stubs for our core.util module.""" - - class Error(error.Error): - pass - - class HostUnsupportedArchError(Error): - pass - - class OSVersionError(Error): - pass - - PROJECT_SPEC_FILENAME = 'unittest_project_spec.xml' - - def __init__(self, bdk_path='/some/bdk/path', - user_data_path='/user/data/path', os_path='/os/path', - bdk_version='99.99', os_version='88.88', arch='fake_arch'): - self.bdk_path = bdk_path - self.user_data_path = user_data_path - self.os_path = os_path - self.bdk_version = bdk_version - self.os_version = os_version - self.arch = arch - self.arch_is_supported = True - self.project_spec = None - - def GetBDKPath(self, *relpath): - return os.path.join(self.bdk_path, *relpath) - - def GetUserDataPath(self, *relpath): - return os.path.join(self.user_data_path, *relpath) - - def DEPRECATED_GetDefaultOSPath(self, *relpath): - return os.path.join(self.os_path, *relpath) - - def GetOSPath(self, version, *relpath): - if version != self.os_version: - raise self.OSVersionError('Invalid OS version: {} (expected {})'.format( - version, self.os_version)) - return os.path.join(self.os_path, *relpath) - - def GetBDKVersion(self): - return self.bdk_version - - def GetOSVersion(self): - return self.os_version - - @staticmethod - def GetAndroidProductOut(platform_out, board): - return util.GetAndroidProductOut(platform_out, board) - - @classmethod - def AsShellArgs(cls, ary): - return util.AsShellArgs(ary) - - @classmethod - def GetExitCode(cls, retval): - if retval == 0: - return 0 - else: - return 1 - - def GetHostArch(self): - if not self.arch_is_supported: - raise self.HostUnsupportedArchError( - '{} is not supported.'.format(self.arch)) - return self.arch - - def FindProjectSpec(self): - return self.project_spec + """Stubs for our core.util module.""" + + class Error(error.Error): + pass + + class HostUnsupportedArchError(Error): + pass + + class OSVersionError(Error): + pass + + PROJECT_SPEC_FILENAME = 'unittest_project_spec.xml' + + def __init__(self, bdk_path='/some/bdk/path', + user_data_path='/user/data/path', os_path='/os/path', + bdk_version='99.99', os_version='88.88', arch='fake_arch'): + self.bdk_path = bdk_path + self.user_data_path = user_data_path + self.os_path = os_path + self.bdk_version = bdk_version + self.os_version = os_version + self.arch = arch + self.arch_is_supported = True + self.project_spec = None + + def GetBDKPath(self, *relpath): + return os.path.join(self.bdk_path, *relpath) + + def GetUserDataPath(self, *relpath): + return os.path.join(self.user_data_path, *relpath) + + def DEPRECATED_GetDefaultOSPath(self, *relpath): + return os.path.join(self.os_path, *relpath) + + def GetOSPath(self, version, *relpath): + if version != self.os_version: + raise self.OSVersionError( + 'Invalid OS version: {} (expected {})'.format(version, + self.os_version)) + return os.path.join(self.os_path, *relpath) + + def GetBDKVersion(self): + return self.bdk_version + + def GetOSVersion(self): + return self.os_version + + @staticmethod + def GetAndroidProductOut(platform_out, board): + return util.GetAndroidProductOut(platform_out, board) + + @classmethod + def AsShellArgs(cls, ary): + return util.AsShellArgs(ary) + + @classmethod + def GetExitCode(cls, retval): + if retval == 0: + return 0 + else: + return 1 + + def GetHostArch(self): + if not self.arch_is_supported: + raise self.HostUnsupportedArchError( + '{} is not supported.'.format(self.arch)) + return self.arch + + def FindProjectSpec(self): + return self.project_spec diff --git a/cli/lib/core/util_unittest.py b/cli/lib/core/util_unittest.py index 399e194..71ef2a4 100644 --- a/cli/lib/core/util_unittest.py +++ b/cli/lib/core/util_unittest.py @@ -25,97 +25,102 @@ from test import stubs class UtilTest(unittest.TestCase): - """Tests for the Util functions.""" - - def setUp(self): - self.stub_os = stubs.StubOs() - self.stub_open = stubs.StubOpen(self.stub_os) - - util.os = self.stub_os - util.open = self.stub_open.open - - def test_bdk_path(self): - # We use startswith since it might also be .pyc if unit tests have been - # previously run. - self.assertTrue(__file__.startswith( - util.GetBDKPath('cli', 'lib', 'core', 'util_unittest.py'))) - - def test_bdk_version(self): - self.stub_open.files[util.GetBDKPath('VERSION')] = stubs.StubFile('123.45') - self.stub_os.path.should_exist = [util.GetBDKPath('VERSION')] - self.assertEqual(util.GetBDKVersion(), '123.45') - - def test_os_version(self): - self.stub_open.files[util.GetBDKPath('VERSION')] = stubs.StubFile('67.89') - self.stub_os.path.should_exist = [util.GetBDKPath('VERSION')] - self.assertEqual(util.GetOSVersion(), '67.89') - - def test_os_path(self): - self.stub_open.files[util.GetBDKPath('VERSION')] = stubs.StubFile('01.23') - self.stub_os.path.should_exist = [util.GetBDKPath('VERSION')] - # We use startswith since it might also be .pyc if unit tests have been - # previously run. - self.assertTrue(__file__.startswith( - util.GetOSPath('01.23', 'tools', 'bdk', 'cli', 'lib', 'core', - 'util_unittest.py'))) - # Error should complain about requested version. - with self.assertRaisesRegexp(util.OSVersionError, '4.5'): - util.GetOSPath('4.5') - - def test_deprecated_default_os_path(self): - self.stub_open.files[util.GetBDKPath('VERSION')] = stubs.StubFile('01.23') - self.stub_os.path.should_exist = [util.GetBDKPath('VERSION')] - # We use startswith since it might also be .pyc if unit tests have been - # previously run. - self.assertTrue(__file__.startswith( - util.DEPRECATED_GetDefaultOSPath('tools', 'bdk', 'cli', 'lib', 'core', - 'util_unittest.py'))) - - def test_get_product_dir(self): - self.stub_os.cwd = '/long/way/down/to/the/bottom/turtle/' - self.stub_os.path.should_exist = [ - self.stub_os.path.join('/long', util.ANDROID_PRODUCTS_MK), - self.stub_os.path.join('/long/way/down/to/top', - util.ANDROID_PRODUCTS_MK), - self.stub_os.path.join('/long/way', util.ANDROID_PRODUCTS_MK)] - # Make sure we find the closest in our direct path. - self.assertEqual(util.GetProductDir(), '/long/way') - - def test_get_no_product_dir(self): - self.stub_os.cwd = '/long/way/down/to/the/bottom/turtle/' - self.stub_os.path.should_exist = [ - self.stub_os.path.join('/other', util.ANDROID_PRODUCTS_MK)] - self.assertEqual(util.GetProductDir(), None) - - def test_get_project_spec(self): - self.stub_os.cwd = '/long/way/down/to/the/bottom/turtle/' - self.stub_os.path.should_exist = [ - self.stub_os.path.join('/long', util.PROJECT_SPEC_FILENAME), - self.stub_os.path.join('/long/way/down/to/top', - util.PROJECT_SPEC_FILENAME), - self.stub_os.path.join('/long/way', util.PROJECT_SPEC_FILENAME)] - # Make sure we find the closest in our direct path. - self.assertEqual(util.FindProjectSpec(), - self.stub_os.path.join('/long/way', - util.PROJECT_SPEC_FILENAME)) - - def test_get_no_project_spec(self): - self.stub_os.cwd = '/long/way/down/to/the/bottom/turtle/' - self.stub_os.path.should_exist = [ - self.stub_os.path.join('/other', util.PROJECT_SPEC_FILENAME)] - self.assertEqual(util.FindProjectSpec(), None) - - def test_android_product_out(self): - self.assertEqual(util.GetAndroidProductOut('platform/dir/', 'board_name'), - 'platform/dir/target/product/board_name') - - def test_get_host(self): - self.stub_os.SetUname(('Linux', 'host_name.website.com', '99.99-awesome', - 'stuff', 'x86_64')) - self.assertEqual(util.GetHostArch(), 'linux-x86') - - def test_get_bad_host(self): - self.stub_os.SetUname(('OSX', 'host_name.website.com', '99.99-awesome', - 'stuff', 'x86_64')) - with self.assertRaises(util.HostUnsupportedArchError): - util.GetHostArch() + """Tests for the Util functions.""" + + def setUp(self): + self.stub_os = stubs.StubOs() + self.stub_open = stubs.StubOpen(self.stub_os) + + util.os = self.stub_os + util.open = self.stub_open.open + + def test_bdk_path(self): + # We use startswith since it might also be .pyc if unit tests have been + # previously run. + self.assertTrue(__file__.startswith( + util.GetBDKPath('cli', 'lib', 'core', 'util_unittest.py'))) + + def test_bdk_version(self): + self.stub_open.files[util.GetBDKPath('VERSION')] = ( + stubs.StubFile('123.45')) + self.stub_os.path.should_exist = [util.GetBDKPath('VERSION')] + self.assertEqual(util.GetBDKVersion(), '123.45') + + def test_os_version(self): + self.stub_open.files[util.GetBDKPath('VERSION')] = ( + stubs.StubFile('67.89')) + self.stub_os.path.should_exist = [util.GetBDKPath('VERSION')] + self.assertEqual(util.GetOSVersion(), '67.89') + + def test_os_path(self): + self.stub_open.files[util.GetBDKPath('VERSION')] = ( + stubs.StubFile('01.23')) + self.stub_os.path.should_exist = [util.GetBDKPath('VERSION')] + # We use startswith since it might also be .pyc if unit tests have been + # previously run. + self.assertTrue(__file__.startswith( + util.GetOSPath('01.23', 'tools', 'bdk', 'cli', 'lib', 'core', + 'util_unittest.py'))) + # Error should complain about requested version. + with self.assertRaisesRegexp(util.OSVersionError, '4.5'): + util.GetOSPath('4.5') + + def test_deprecated_default_os_path(self): + self.stub_open.files[util.GetBDKPath('VERSION')] = ( + stubs.StubFile('01.23')) + self.stub_os.path.should_exist = [util.GetBDKPath('VERSION')] + # We use startswith since it might also be .pyc if unit tests have been + # previously run. + self.assertTrue(__file__.startswith( + util.DEPRECATED_GetDefaultOSPath('tools', 'bdk', 'cli', 'lib', + 'core', 'util_unittest.py'))) + + def test_get_product_dir(self): + self.stub_os.cwd = '/long/way/down/to/the/bottom/turtle/' + self.stub_os.path.should_exist = [ + self.stub_os.path.join('/long', util.ANDROID_PRODUCTS_MK), + self.stub_os.path.join('/long/way/down/to/top', + util.ANDROID_PRODUCTS_MK), + self.stub_os.path.join('/long/way', util.ANDROID_PRODUCTS_MK)] + # Make sure we find the closest in our direct path. + self.assertEqual(util.GetProductDir(), '/long/way') + + def test_get_no_product_dir(self): + self.stub_os.cwd = '/long/way/down/to/the/bottom/turtle/' + self.stub_os.path.should_exist = [ + self.stub_os.path.join('/other', util.ANDROID_PRODUCTS_MK)] + self.assertEqual(util.GetProductDir(), None) + + def test_get_project_spec(self): + self.stub_os.cwd = '/long/way/down/to/the/bottom/turtle/' + self.stub_os.path.should_exist = [ + self.stub_os.path.join('/long', util.PROJECT_SPEC_FILENAME), + self.stub_os.path.join('/long/way/down/to/top', + util.PROJECT_SPEC_FILENAME), + self.stub_os.path.join('/long/way', util.PROJECT_SPEC_FILENAME)] + # Make sure we find the closest in our direct path. + self.assertEqual(util.FindProjectSpec(), + self.stub_os.path.join('/long/way', + util.PROJECT_SPEC_FILENAME)) + + def test_get_no_project_spec(self): + self.stub_os.cwd = '/long/way/down/to/the/bottom/turtle/' + self.stub_os.path.should_exist = [ + self.stub_os.path.join('/other', util.PROJECT_SPEC_FILENAME)] + self.assertEqual(util.FindProjectSpec(), None) + + def test_android_product_out(self): + self.assertEqual( + util.GetAndroidProductOut('platform/dir/', 'board_name'), + 'platform/dir/target/product/board_name') + + def test_get_host(self): + self.stub_os.SetUname(('Linux', 'host_name.website.com', + '99.99-awesome', 'stuff', 'x86_64')) + self.assertEqual(util.GetHostArch(), 'linux-x86') + + def test_get_bad_host(self): + self.stub_os.SetUname(('OSX', 'host_name.website.com', '99.99-awesome', + 'stuff', 'x86_64')) + with self.assertRaises(util.HostUnsupportedArchError): + util.GetHostArch() |