aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteve Fung <stevefung@google.com>2016-04-19 03:44:23 -0700
committerSteve Fung <stevefung@google.com>2016-04-19 03:44:33 -0700
commitb3baeb0038327709517abe6c9ae3e7682a00e0e5 (patch)
tree2c720739a44a68530c02d68ea60e6a299dcecb2b
parentf574c6a64c36b12469873e82ab0ae4872059aebb (diff)
downloadbdk-b3baeb0038327709517abe6c9ae3e7682a00e0e5.tar.gz
Convert core/ to 4 space indent
Convert all files in the core/ folder to a 4 space indent to comply with PEP8 style rules. Bug: 28007659 Test: `python test_runner.py` passes. Test: pylint passes. Change-Id: If470162a50e29621cea84aef02dadb816ca1c46c
-rw-r--r--cli/lib/core/build.py191
-rw-r--r--cli/lib/core/build_unittest.py213
-rw-r--r--cli/lib/core/config.py315
-rw-r--r--cli/lib/core/config_stub.py48
-rw-r--r--cli/lib/core/config_unittest.py196
-rw-r--r--cli/lib/core/image_build.py630
-rw-r--r--cli/lib/core/image_build_unittest.py906
-rw-r--r--cli/lib/core/popen.py8
-rw-r--r--cli/lib/core/product.py255
-rw-r--r--cli/lib/core/properties.py442
-rw-r--r--cli/lib/core/provision.py218
-rw-r--r--cli/lib/core/provision_unittest.py364
-rw-r--r--cli/lib/core/timer.py128
-rw-r--r--cli/lib/core/timer_unittest.py220
-rw-r--r--cli/lib/core/tool.py519
-rw-r--r--cli/lib/core/tool_unittest.py547
-rw-r--r--cli/lib/core/user_config.py122
-rw-r--r--cli/lib/core/user_config_stub.py27
-rw-r--r--cli/lib/core/user_config_unittest.py165
-rw-r--r--cli/lib/core/util.py192
-rw-r--r--cli/lib/core/util_stub.py139
-rw-r--r--cli/lib/core/util_unittest.py193
22 files changed, 3041 insertions, 2997 deletions
diff --git a/cli/lib/core/build.py b/cli/lib/core/build.py
index 08b98c4..c8f7b42 100644
--- a/cli/lib/core/build.py
+++ b/cli/lib/core/build.py
@@ -27,15 +27,15 @@ import error
class Error(error.Error):
- """General build failure."""
+ """General build failure."""
class BuildTypeError(Error):
- """Raised when an invalid build type is given."""
+ """Raised when an invalid build type is given."""
class BuildUtilityError(Error):
- """Raised when a build utility fails or is missing."""
+ """Raised when a build utility fails or is missing."""
BUILD_TYPES = ('user', 'userdebug', 'eng')
@@ -43,104 +43,107 @@ BUILD_TYPES = ('user', 'userdebug', 'eng')
def _GetBuildScript(os_version, bsp, build_type, out_dir=None,
extra_make_args=None):
- """Creates a shell script to run a platform build.
+ """Creates a shell script to run a platform build.
- The script is small but uses several variables, which is why it's
- easier just to build it here and run it directly. If the script
- expands to be more complicated we probably want to move it into its
- own file.
+ The script is small but uses several variables, which is why it's
+ easier just to build it here and run it directly. If the script
+ expands to be more complicated we probably want to move it into its
+ own file.
- Args:
- os_version: version of the Brillo OS to build.
- bsp: name of the BSP to build.
- build_type: one of BUILD_TYPES.
- out_dir: directory for build output or None to build in-tree.
- extra_make_args: additional make args as a list of strings or None.
+ Args:
+ os_version: version of the Brillo OS to build.
+ bsp: name of the BSP to build.
+ build_type: one of BUILD_TYPES.
+ out_dir: directory for build output or None to build in-tree.
+ extra_make_args: additional make args as a list of strings or None.
- Returns:
- A string shell script that will build the platform.
- """
- make_args = []
+ Returns:
+ A string shell script that will build the platform.
+ """
+ make_args = []
- if out_dir is not None:
- # Android build treats relative paths as relative to the source root, use
- # abspath() to put it relative to the caller's current directory instead.
- make_args.append('OUT_DIR="{}"'.format(os.path.abspath(out_dir)))
+ if out_dir is not None:
+ # Android build treats relative paths as relative to the source root,
+ # use abspath() to put it relative to the caller's current directory
+ # instead.
+ make_args.append('OUT_DIR="{}"'.format(os.path.abspath(out_dir)))
- if extra_make_args:
- make_args.extend(extra_make_args)
+ if extra_make_args:
+ make_args.extend(extra_make_args)
- cmds = ['cd "{}"'.format(util.GetOSPath(os_version)),
- '. build/envsetup.sh',
- 'lunch "{}-{}"'.format(bsp, build_type),
- 'm {}'.format(' '.join(make_args))]
+ cmds = ['cd "{}"'.format(util.GetOSPath(os_version)),
+ '. build/envsetup.sh',
+ 'lunch "{}-{}"'.format(bsp, build_type),
+ 'm {}'.format(' '.join(make_args))]
- # Link all commands with && to exit immediately if one fails.
- return ' && '.join(cmds)
+ # Link all commands with && to exit immediately if one fails.
+ return ' && '.join(cmds)
def BuildPlatform(target, out_dir=None, extra_make_args=None):
- """Builds the Brillo platform.
-
- Caller should validate target OS and Board before calling.
-
- Args:
- target: core.parsing.target.Target to build.
- out_dir: directory for build output or None to build in-tree.
- extra_make_args: additional make args as a list of strings or None.
-
- Returns:
- The build exit code.
-
- Raises:
- BuildTypeError: target build_type is invalid.
- BuildUtilityError: subprocess invocation fails.
- """
- if target.build_type not in BUILD_TYPES:
- raise BuildTypeError('Invalid build type: {} (must be one of {})'.format(
- target.build_type, BUILD_TYPES))
-
- build_script = _GetBuildScript(target.os_version, target.board,
- target.build_type,
- out_dir, extra_make_args)
-
- # Set up the build environment. We strip out most environment variables,
- # but some values (e.g. USER, TERM) are useful or necessary for the build.
- build_env = {var: os.environ[var] for var in tool.DEFAULT_PASSTHROUGH_ENV if
- var in os.environ}
-
- # Make sure the output directory exists so we can log to it.
- if not os.path.isdir(out_dir):
- os.makedirs(out_dir)
- log_file_path = os.path.join(out_dir, 'bdk_last_build.log')
-
- with target.get_device().linked(target.os_version):
- # The build script uses bash features like && to link multiple commands and
- # . to source scripts, so we invoke via bash.
- try:
- build_subprocess = subprocess.Popen(build_script, shell=True,
- env=build_env,
- executable='bash',
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- except OSError as e:
- raise BuildUtilityError(
- 'Failed to track a platform build. '
- 'Is "bash" installed, accessible, and in the PATH?: {}'.format(e))
-
- # Piping a command to tee on the shell clobbers the exit code, so instead
- # we make them two separate subprocesses and pipe them together.
- try:
- tee_subprocess = subprocess.Popen(['tee', log_file_path], env=build_env,
- stdin=build_subprocess.stdout)
- except OSError as e:
- raise BuildUtilityError(
- 'Failed to track a platform build. '
- 'Is "tee" installed, accessible, and in the PATH?: {}'.format(e))
-
- # Close stdout to avoid pipes filling up and blocking forever if tee fails.
- # https://docs.python.org/2/library/subprocess.html#replacing-shell-pipeline.
- build_subprocess.stdout.close()
-
- tee_subprocess.communicate()
- return build_subprocess.wait()
+ """Builds the Brillo platform.
+
+ Caller should validate target OS and Board before calling.
+
+ Args:
+ target: core.parsing.target.Target to build.
+ out_dir: directory for build output or None to build in-tree.
+ extra_make_args: additional make args as a list of strings or None.
+
+ Returns:
+ The build exit code.
+
+ Raises:
+ BuildTypeError: target build_type is invalid.
+ BuildUtilityError: subprocess invocation fails.
+ """
+ if target.build_type not in BUILD_TYPES:
+ raise BuildTypeError(
+ 'Invalid build type: {} (must be one of {})'.format(
+ target.build_type, BUILD_TYPES))
+
+ build_script = _GetBuildScript(target.os_version, target.board,
+ target.build_type, out_dir, extra_make_args)
+
+ # Set up the build environment. We strip out most environment variables,
+ # but some values (e.g. USER, TERM) are useful or necessary for the build.
+ build_env = {var: os.environ[var] for var in tool.DEFAULT_PASSTHROUGH_ENV
+ if var in os.environ}
+
+ # Make sure the output directory exists so we can log to it.
+ if not os.path.isdir(out_dir):
+ os.makedirs(out_dir)
+ log_file_path = os.path.join(out_dir, 'bdk_last_build.log')
+
+ with target.get_device().linked(target.os_version):
+ # The build script uses bash features like && to link multiple commands
+ # and to source scripts, so we invoke via bash.
+ try:
+ build_subprocess = subprocess.Popen(build_script, shell=True,
+ env=build_env,
+ executable='bash',
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ except OSError as e:
+ raise BuildUtilityError(
+ 'Failed to track a platform build. Is '
+ '"bash" installed, accessible, and in the PATH?: {}'.format(e))
+
+ # Piping a command to tee on the shell clobbers the exit code, so
+ # instead we make them two separate subprocesses and pipe them together.
+ try:
+ tee_subprocess = subprocess.Popen(['tee', log_file_path],
+ env=build_env,
+ stdin=build_subprocess.stdout)
+ except OSError as e:
+ raise BuildUtilityError(
+ 'Failed to track a platform build. Is '
+ '"tee" installed, accessible, and in the PATH?: {}'.format(e))
+
+ # Close stdout to avoid pipes filling up and blocking forever if tee
+ # fails.
+ # https://docs.python.org/2/library/subprocess.html#replacing-shell-pipeline.
+ build_subprocess.stdout.close()
+
+ tee_subprocess.communicate()
+ return build_subprocess.wait()
diff --git a/cli/lib/core/build_unittest.py b/cli/lib/core/build_unittest.py
index fd38ac7..5fe4963 100644
--- a/cli/lib/core/build_unittest.py
+++ b/cli/lib/core/build_unittest.py
@@ -16,6 +16,7 @@
"""Unit tests for build.py."""
+
import os
import unittest
@@ -25,108 +26,112 @@ from core import util_stub
from project import target_stub
from test import stubs
+
class BuildPlatformTest(unittest.TestCase):
- # Valid arguments listed here so we can test changing one at a time.
- _BSP = 'test_board'
- _BUILD_TYPE = 'user'
- _OUT_DIR = '/foo/bar'
- _OS_VERSION = '12.34'
- _LOG_FILE = os.path.join(_OUT_DIR, 'bdk_last_build.log')
-
- def setUp(self):
- self.stub_os = stubs.StubOs()
- self.stub_util = util_stub.StubUtil(os_version=self._OS_VERSION)
- self.stub_subprocess = stubs.StubSubprocess()
-
- # Allow BuildPlatform() to make the directory for the log file.
- self.stub_os.should_makedirs = [self._OUT_DIR]
-
- build.os = self.stub_os
- build.util = self.stub_util
- build.subprocess = self.stub_subprocess
-
- self.device = device_stub.StubDevice(should_link_version=self._OS_VERSION)
- self.target = target_stub.StubTarget(
- board=self._BSP, build_type=self._BUILD_TYPE,
- os_version=self._OS_VERSION,
- device=self.device)
-
- def test_success(self):
- make_command = self.stub_subprocess.AddCommand()
- tee_command = self.stub_subprocess.AddCommand()
-
- self.assertEqual(0, build.BuildPlatform(self.target, self._OUT_DIR))
-
- # pylint: disable=protected-access
- make_command.AssertCallWas(
- build._GetBuildScript(self.target.os_version,
- self._BSP, self._BUILD_TYPE, self._OUT_DIR),
- stdout=self.stub_subprocess.PIPE, stderr=self.stub_subprocess.STDOUT,
- executable='bash', shell=True, env={})
- tee_command.AssertCallWas(['tee', self._LOG_FILE],
- stdin=make_command.stdout, env={})
-
- def test_success_extra_make_args(self):
- """Tests passing additional make args from the user."""
- extra_make_args = ['-j', '40']
- make_command = self.stub_subprocess.AddCommand()
- self.stub_subprocess.AddCommand(['tee', self._LOG_FILE])
-
- self.assertEqual(0, build.BuildPlatform(
- self.target, self._OUT_DIR,
- extra_make_args=extra_make_args))
-
- # pylint: disable=protected-access
- make_command.AssertCallWas(
- build._GetBuildScript(self.target.os_version,
- self._BSP, self._BUILD_TYPE, self._OUT_DIR,
- extra_make_args=extra_make_args),
- stdout=self.stub_subprocess.PIPE, stderr=self.stub_subprocess.STDOUT,
- executable='bash', shell=True, env={})
-
- def test_success_env_passthrough(self):
- """Tests passing needed environmental variables through."""
- self.stub_os.environ['USER'] = 'test_user'
- self.stub_os.environ['PATH'] = 'test_path'
- make_command = self.stub_subprocess.AddCommand()
- tee_command = self.stub_subprocess.AddCommand(['tee', self._LOG_FILE])
-
- self.assertEqual(0, build.BuildPlatform(
- self.target, self._OUT_DIR))
-
- # Make sure USER passes through but PATH does not.
- make_command.AssertCallContained(env={'USER': 'test_user'})
- tee_command.AssertCallContained(env={'USER': 'test_user'})
-
- def test_return_exit_code(self):
- """Tests that the make exit code is returned."""
- self.stub_subprocess.AddCommand(ret_code=1)
- self.stub_subprocess.AddCommand(['tee', self._LOG_FILE])
-
- self.assertEqual(1, build.BuildPlatform(
- self.target, self._OUT_DIR))
-
- def test_invalid_build_type(self):
- self.target.build_type = 'bad_build_type'
- with self.assertRaises(build.BuildTypeError):
- build.BuildPlatform(self.target, self._OUT_DIR)
-
- @staticmethod
- def _raise_oserror():
- raise OSError('a faked OSError has occurred.')
-
- def test_missing_bash(self):
- """Tests passing additional make args from the user."""
- # Expect a make command.
- self.stub_subprocess.AddCommand(init_side_effect=self._raise_oserror)
- with self.assertRaises(build.BuildUtilityError):
- build.BuildPlatform(self.target, self._OUT_DIR)
-
- def test_missing_tee(self):
- """Tests passing additional make args from the user."""
- # Expect a make command.
- self.stub_subprocess.AddCommand()
- self.stub_subprocess.AddCommand(['tee', self._LOG_FILE],
- init_side_effect=self._raise_oserror)
- with self.assertRaises(build.BuildUtilityError):
- build.BuildPlatform(self.target, self._OUT_DIR)
+ # Valid arguments listed here so we can test changing one at a time.
+ _BSP = 'test_board'
+ _BUILD_TYPE = 'user'
+ _OUT_DIR = '/foo/bar'
+ _OS_VERSION = '12.34'
+ _LOG_FILE = os.path.join(_OUT_DIR, 'bdk_last_build.log')
+
+ def setUp(self):
+ self.stub_os = stubs.StubOs()
+ self.stub_util = util_stub.StubUtil(os_version=self._OS_VERSION)
+ self.stub_subprocess = stubs.StubSubprocess()
+
+ # Allow BuildPlatform() to make the directory for the log file.
+ self.stub_os.should_makedirs = [self._OUT_DIR]
+
+ build.os = self.stub_os
+ build.util = self.stub_util
+ build.subprocess = self.stub_subprocess
+
+ self.device = device_stub.StubDevice(
+ should_link_version=self._OS_VERSION)
+ self.target = target_stub.StubTarget(
+ board=self._BSP, build_type=self._BUILD_TYPE,
+ os_version=self._OS_VERSION,
+ device=self.device)
+
+ def test_success(self):
+ make_command = self.stub_subprocess.AddCommand()
+ tee_command = self.stub_subprocess.AddCommand()
+
+ self.assertEqual(0, build.BuildPlatform(self.target, self._OUT_DIR))
+
+ # pylint: disable=protected-access
+ make_command.AssertCallWas(
+ build._GetBuildScript(self.target.os_version,
+ self._BSP, self._BUILD_TYPE, self._OUT_DIR),
+ stdout=self.stub_subprocess.PIPE,
+ stderr=self.stub_subprocess.STDOUT,
+ executable='bash', shell=True, env={})
+ tee_command.AssertCallWas(['tee', self._LOG_FILE],
+ stdin=make_command.stdout, env={})
+
+ def test_success_extra_make_args(self):
+ """Tests passing additional make args from the user."""
+ extra_make_args = ['-j', '40']
+ make_command = self.stub_subprocess.AddCommand()
+ self.stub_subprocess.AddCommand(['tee', self._LOG_FILE])
+
+ self.assertEqual(0, build.BuildPlatform(
+ self.target, self._OUT_DIR,
+ extra_make_args=extra_make_args))
+
+ # pylint: disable=protected-access
+ make_command.AssertCallWas(
+ build._GetBuildScript(self.target.os_version,
+ self._BSP, self._BUILD_TYPE, self._OUT_DIR,
+ extra_make_args=extra_make_args),
+ stdout=self.stub_subprocess.PIPE,
+ stderr=self.stub_subprocess.STDOUT,
+ executable='bash', shell=True, env={})
+
+ def test_success_env_passthrough(self):
+ """Tests passing needed environmental variables through."""
+ self.stub_os.environ['USER'] = 'test_user'
+ self.stub_os.environ['PATH'] = 'test_path'
+ make_command = self.stub_subprocess.AddCommand()
+ tee_command = self.stub_subprocess.AddCommand(['tee', self._LOG_FILE])
+
+ self.assertEqual(0, build.BuildPlatform(
+ self.target, self._OUT_DIR))
+
+ # Make sure USER passes through but PATH does not.
+ make_command.AssertCallContained(env={'USER': 'test_user'})
+ tee_command.AssertCallContained(env={'USER': 'test_user'})
+
+ def test_return_exit_code(self):
+ """Tests that the make exit code is returned."""
+ self.stub_subprocess.AddCommand(ret_code=1)
+ self.stub_subprocess.AddCommand(['tee', self._LOG_FILE])
+
+ self.assertEqual(1, build.BuildPlatform(
+ self.target, self._OUT_DIR))
+
+ def test_invalid_build_type(self):
+ self.target.build_type = 'bad_build_type'
+ with self.assertRaises(build.BuildTypeError):
+ build.BuildPlatform(self.target, self._OUT_DIR)
+
+ @staticmethod
+ def _raise_oserror():
+ raise OSError('a faked OSError has occurred.')
+
+ def test_missing_bash(self):
+ """Tests passing additional make args from the user."""
+ # Expect a make command.
+ self.stub_subprocess.AddCommand(init_side_effect=self._raise_oserror)
+ with self.assertRaises(build.BuildUtilityError):
+ build.BuildPlatform(self.target, self._OUT_DIR)
+
+ def test_missing_tee(self):
+ """Tests passing additional make args from the user."""
+ # Expect a make command.
+ self.stub_subprocess.AddCommand()
+ self.stub_subprocess.AddCommand(['tee', self._LOG_FILE],
+ init_side_effect=self._raise_oserror)
+ with self.assertRaises(build.BuildUtilityError):
+ build.BuildPlatform(self.target, self._OUT_DIR)
diff --git a/cli/lib/core/config.py b/cli/lib/core/config.py
index c64f876..cd232ad 100644
--- a/cli/lib/core/config.py
+++ b/cli/lib/core/config.py
@@ -20,182 +20,181 @@
import os
import sqlite3
import string
-import uuid
from core import properties
-from core import util
class Store(properties.PropBase):
- """config.Store
- A sqlite3-backed dict interface for storing persistent data.
+ """A sqlite3-backed dict interface for storing persistent data.
+
Usage:
- cd = config.Store('data.db')
- cs['mykey'] = 'my val'
- print cs['mykey']
- 'mykey'
- """
-
- PREFIX = 'data_'
- CACHING = True
-
- def __init__(self, file_path, table='data'):
- super(Store, self).__init__()
- self._conn = None
- self._path = file_path
- self._table = table
- self._initialized = False
-
- def _setup(self):
- if self._initialized:
- return True
- if self._path != ':memory:':
- db_dir = os.path.dirname(self._path)
- if not os.path.isdir(db_dir):
- os.makedirs(db_dir)
- self._conn = sqlite3.connect(self._path);
- self._conn.execute('create table if not exists %s '
- '(key text PRIMARY KEY, val text)' % self._table)
- self._initialized = True
-
- def dict(self):
- """Dumps the entire table as a dict."""
- if not self._initialized:
- self._setup()
- c = self._conn.cursor()
- d = {}
- for row in c.execute("select key, val from %s" % self._table):
- d[row[0]] = row[1]
- return d
-
- def _Get(self, key):
- if not self._initialized:
- self._setup()
- c = self._conn.cursor()
- c.execute("select val from %s where key=?" % self._table, (key, ))
- ret = c.fetchone()
- if ret:
- return ret[0]
- return None
-
- def _Set(self, key, val):
- if not self._initialized:
- self._setup()
- c = self._conn.cursor()
- c.execute("insert or replace into %s values (?, ?)" % self._table,
- (key, val))
- self._conn.commit()
- return val
-
- def _load(self, key):
- if key in self.properties():
- key = self.PREFIX + key
- return self._Get(key)
-
- def _save(self, key, value):
- if key in self.properties():
- key = self.PREFIX + key
- return self._Set(key, value)
+ cd = config.Store('data.db')
+ cs['mykey'] = 'my val'
+ print cs['mykey']
+ 'mykey'
+ """
+
+ PREFIX = 'data_'
+ CACHING = True
+
+ def __init__(self, file_path, table='data'):
+ super(Store, self).__init__()
+ self._conn = None
+ self._path = file_path
+ self._table = table
+ self._initialized = False
+
+ def _setup(self):
+ if self._initialized:
+ return True
+ if self._path != ':memory:':
+ db_dir = os.path.dirname(self._path)
+ if not os.path.isdir(db_dir):
+ os.makedirs(db_dir)
+ self._conn = sqlite3.connect(self._path)
+ self._conn.execute('create table if not exists %s '
+ '(key text PRIMARY KEY, val text)' % self._table)
+ self._initialized = True
+
+ def dict(self):
+ """Dumps the entire table as a dict."""
+ if not self._initialized:
+ self._setup()
+ c = self._conn.cursor()
+ d = {}
+ for row in c.execute("select key, val from %s" % self._table):
+ d[row[0]] = row[1]
+ return d
+
+ def _Get(self, key):
+ if not self._initialized:
+ self._setup()
+ c = self._conn.cursor()
+ c.execute("select val from %s where key=?" % self._table, (key, ))
+ ret = c.fetchone()
+ if ret:
+ return ret[0]
+ return None
+
+ def _Set(self, key, val):
+ if not self._initialized:
+ self._setup()
+ c = self._conn.cursor()
+ c.execute("insert or replace into %s values (?, ?)" % self._table,
+ (key, val))
+ self._conn.commit()
+ return val
+
+ def _load(self, key):
+ if key in self.properties():
+ key = self.PREFIX + key
+ return self._Get(key)
+
+ def _save(self, key, value):
+ if key in self.properties():
+ key = self.PREFIX + key
+ return self._Set(key, value)
class DictStore(properties.PropBase):
- """config.DictStore
- A dict-backed store primarily used for in-memory replacement for other
- stores in tests.
- """
- REQUIRED_PROPS = {}
- OPTIONAL_PROPS = {}
+ """A dict-backed store primarily used for in-memory replacement for other
+ stores in tests.
+ """
+ REQUIRED_PROPS = {}
+ OPTIONAL_PROPS = {}
- def __init__(self):
- super(DictStore, self).__init__()
- self._d = {}
+ def __init__(self):
+ super(DictStore, self).__init__()
+ self._d = {}
- def _save(self, key, value):
- self._d[key] = value
- return value
+ def _save(self, key, value):
+ self._d[key] = value
+ return value
- def _load(self, key):
- if not key in self._d:
- return ''
- return self._d[key]
+ def _load(self, key):
+ if not key in self._d:
+ return ''
+ return self._d[key]
class FileStore(properties.PropBase):
- """config.FileStore
- A file-backed dict interface for storing persistent data.
+ """A file-backed dict interface for storing persistent data.
Usage:
- cd = config.FileStore(os.path.join(PRODUCT_DIR, 'data_dir'))
- cd['myprefix/mykey'] = 'my val'
- print cd['myprefix/mykey']
- 'my val'
- """
-
- CACHING = True
-
- def __init__(self, path_prefix='config'):
- self._prefix = path_prefix
- super(FileStore, self).__init__()
-
- def _remove_comments(self, string_):
- """Returns |string_| with comments removed and whitespace adjusted.
-
- Removes whitespace from beginning and end of all lines,
- newline characters are replaced with ' '.
+ cd = config.FileStore(os.path.join(PRODUCT_DIR, 'data_dir'))
+ cd['myprefix/mykey'] = 'my val'
+ print cd['myprefix/mykey']
+ 'my val'
"""
- value = ''
- lines = string_.split('\n')
- for line in lines:
- stripped_line = line.strip()
- if len(stripped_line) > 0 and stripped_line[0] == '#':
- continue
- value += line + ' '
- return value.strip()
-
- def _load_from_file(self, keyfile):
- """Reads in the contents of |keyfile| stripping out comments and newlines"""
- value = ''
- if not os.path.exists(keyfile):
- return value
- with open(keyfile, 'r') as f:
- value = f.read()
- return self._remove_comments(value)
-
- def _save_to_file(self, keyfile, value):
- """Write |value| to |keyfile|"""
- if type(value) != str:
- raise TypeError, 'value must be a str'
- if not os.path.exists(os.path.dirname(keyfile)):
- os.makedirs(os.path.dirname(keyfile))
- with open(keyfile, 'w') as f:
- # Note, if comments are written, they will not be read back in!
- f.write(value)
- return self._remove_comments(value)
-
- def _key_to_path(self, key):
- """Converts a key of [key0/[key2/[.../]]key to a host-compatible path"""
- return os.path.join(self._prefix, *string.split(key, '/'))
-
- def _load(self, key):
- """Returns the value for |key|"""
- keyfile = self._key_to_path(key)
- return self._load_from_file(keyfile)
-
- def _save(self, key, value):
- """Assignes |value| to |key|"""
- keyfile = self._key_to_path(key)
- return self._save_to_file(keyfile, value)
+
+ CACHING = True
+
+ def __init__(self, path_prefix='config'):
+ self._prefix = path_prefix
+ super(FileStore, self).__init__()
+
+ def _remove_comments(self, string_):
+ """Returns |string_| with comments removed and whitespace adjusted.
+
+ Removes whitespace from beginning and end of all lines,
+ newline characters are replaced with ' '.
+ """
+ value = ''
+ lines = string_.split('\n')
+ for line in lines:
+ stripped_line = line.strip()
+ if len(stripped_line) > 0 and stripped_line[0] == '#':
+ continue
+ value += line + ' '
+ return value.strip()
+
+ def _load_from_file(self, keyfile):
+ """Reads in the contents of |keyfile| stripping out comments and
+ newlines
+ """
+ value = ''
+ if not os.path.exists(keyfile):
+ return value
+ with open(keyfile, 'r') as f:
+ value = f.read()
+ return self._remove_comments(value)
+
+ def _save_to_file(self, keyfile, value):
+ """Write |value| to |keyfile|"""
+ if type(value) != str:
+ raise TypeError, 'value must be a str'
+ if not os.path.exists(os.path.dirname(keyfile)):
+ os.makedirs(os.path.dirname(keyfile))
+ with open(keyfile, 'w') as f:
+ # Note, if comments are written, they will not be read back in!
+ f.write(value)
+ return self._remove_comments(value)
+
+ def _key_to_path(self, key):
+ """Converts a key of [key0/[key2/[.../]]key to a host-compatible path"""
+ return os.path.join(self._prefix, *string.split(key, '/'))
+
+ def _load(self, key):
+ """Returns the value for |key|"""
+ keyfile = self._key_to_path(key)
+ return self._load_from_file(keyfile)
+
+ def _save(self, key, value):
+ """Assignes |value| to |key|"""
+ keyfile = self._key_to_path(key)
+ return self._save_to_file(keyfile, value)
class ProductFileStore(FileStore):
- REQUIRED_PROPS = {'name': [], 'brand': [], 'device': [], 'manufacturer': []}
- OPTIONAL_PROPS = {
- 'bdk/version': [], 'bdk/buildtype': ['eng', 'user', 'userdebug'],
- 'bdk/java': ['0', '1'], 'bdk/allowed_environ': [],
- 'brillo/product_id': [], 'copy_files': [],
- 'brillo/crash_server': [], 'packages': []}
-
- def __init__(self, product_path):
- if product_path is None:
- raise ValueError, 'The product path must not be None.'
- super(ProductFileStore, self).__init__(os.path.join(product_path, 'config'))
+ REQUIRED_PROPS = {'name': [], 'brand': [], 'device': [], 'manufacturer': []}
+ OPTIONAL_PROPS = {
+ 'bdk/version': [], 'bdk/buildtype': ['eng', 'user', 'userdebug'],
+ 'bdk/java': ['0', '1'], 'bdk/allowed_environ': [],
+ 'brillo/product_id': [], 'copy_files': [],
+ 'brillo/crash_server': [], 'packages': []}
+
+ def __init__(self, product_path):
+ if product_path is None:
+ raise ValueError, 'The product path must not be None.'
+ super(ProductFileStore, self).__init__(os.path.join(product_path,
+ 'config'))
diff --git a/cli/lib/core/config_stub.py b/cli/lib/core/config_stub.py
index 2644bcb..9131515 100644
--- a/cli/lib/core/config_stub.py
+++ b/cli/lib/core/config_stub.py
@@ -18,35 +18,35 @@
class StubConfig(object):
- """Stubs the core.config module.
+ """Stubs the core.config module.
- Attributes:
- user_store: the StubUserStore object that will be passed to the
- code under test. Modify this to get the desired behavior.
- """
+ Attributes:
+ user_store: the StubUserStore object that will be passed to the
+ code under test. Modify this to get the desired behavior.
+ """
- def __init__(self, **kwargs):
- """Initializes a StubConfig object.
+ def __init__(self, **kwargs):
+ """Initializes a StubConfig object.
- Args:
- kwargs: passed through to StubUserStore init.
- """
- self.user_store = self.StubUserStore(**kwargs)
+ Args:
+ kwargs: passed through to StubUserStore init.
+ """
+ self.user_store = self.StubUserStore(**kwargs)
- def UserStore(self, *_args, **_kwargs):
- """Returns our user_store object."""
- return self.user_store
+ def UserStore(self, *_args, **_kwargs):
+ """Returns our user_store object."""
+ return self.user_store
- class StubUserStore(object):
- """Stubs the config.UserStore class."""
+ class StubUserStore(object):
+ """Stubs the config.UserStore class."""
- def __init__(self, metrics_opt_in='1', is_complete=True):
- self.metrics_opt_in = metrics_opt_in
- self.is_complete = is_complete
- self.uid = 0
+ def __init__(self, metrics_opt_in='1', is_complete=True):
+ self.metrics_opt_in = metrics_opt_in
+ self.is_complete = is_complete
+ self.uid = 0
- def initialize(self, *_args, **_kwargs):
- pass
+ def initialize(self, *_args, **_kwargs):
+ pass
- def complete(self):
- return self.is_complete
+ def complete(self):
+ return self.is_complete
diff --git a/cli/lib/core/config_unittest.py b/cli/lib/core/config_unittest.py
index 8daae9f..7145cfe 100644
--- a/cli/lib/core/config_unittest.py
+++ b/cli/lib/core/config_unittest.py
@@ -25,119 +25,119 @@ from test import stubs
class ProductFileStoreTest(unittest.TestCase):
- def setUp(self):
- self.product_path = 'product_path'
- stub_os = stubs.StubOs()
- self.stub_open = stubs.StubOpen(stub_os)
- config.open = self.stub_open.open
- config.os = stub_os
- self.store = config.ProductFileStore(self.product_path)
-
- def test_load_base(self):
- brand_path = 'product_path/config/brand'
- self.stub_open.files[brand_path] = stubs.StubFile('my-brand')
- config.os.path.should_exist.append(brand_path)
- self.assertEqual(self.store.brand, 'my-brand')
-
- def test_load_deep(self):
- version_path = 'product_path/config/bdk/version'
- expected = 'veggies.1.2.3.4'
- self.stub_open.files[version_path] = stubs.StubFile(expected)
- config.os.path.should_exist.append(version_path)
- self.assertEqual(self.store.bdk.version, expected)
-
- def test_load_comments(self):
- expected = 'veggies.1.2.3.4'
- input_ = """
+ def setUp(self):
+ self.product_path = 'product_path'
+ stub_os = stubs.StubOs()
+ self.stub_open = stubs.StubOpen(stub_os)
+ config.open = self.stub_open.open
+ config.os = stub_os
+ self.store = config.ProductFileStore(self.product_path)
+
+ def test_load_base(self):
+ brand_path = 'product_path/config/brand'
+ self.stub_open.files[brand_path] = stubs.StubFile('my-brand')
+ config.os.path.should_exist.append(brand_path)
+ self.assertEqual(self.store.brand, 'my-brand')
+
+ def test_load_deep(self):
+ version_path = 'product_path/config/bdk/version'
+ expected = 'veggies.1.2.3.4'
+ self.stub_open.files[version_path] = stubs.StubFile(expected)
+ config.os.path.should_exist.append(version_path)
+ self.assertEqual(self.store.bdk.version, expected)
+
+ def test_load_comments(self):
+ expected = 'veggies.1.2.3.4'
+ input_ = """
# Current version: name.num.bers
%s
# That's it!
""" % expected
- f = stubs.StubFile(input_)
- self.stub_open.files['product_path/config/bdk/version'] = f
- config.os.path.should_exist = ['product_path/config/bdk',
- 'product_path/config/bdk/version']
- self.assertEqual(self.store.bdk.version, expected)
- self.assertEqual(f.contents, string.split(input_, '\n'))
-
- def test_load_multiline(self):
- expected = 'ledflasher ledservice'
- input_ = """
+ f = stubs.StubFile(input_)
+ self.stub_open.files['product_path/config/bdk/version'] = f
+ config.os.path.should_exist = ['product_path/config/bdk',
+ 'product_path/config/bdk/version']
+ self.assertEqual(self.store.bdk.version, expected)
+ self.assertEqual(f.contents, string.split(input_, '\n'))
+
+ def test_load_multiline(self):
+ expected = 'ledflasher ledservice'
+ input_ = """
ledflasher
ledservice
"""
- f = stubs.StubFile(input_)
- self.stub_open.files['product_path/config/packages'] = f
- config.os.path.should_exist = ['product_path/config',
- 'product_path/config/packages']
- self.assertEqual(self.store.packages, expected)
- self.assertEqual(f.contents, string.split(input_, '\n'))
-
- def test_load_invalid(self):
- with self.assertRaises(AttributeError):
- _ = self.store.no.real.thing
-
- def test_save_exists(self):
- f = stubs.StubFile()
- self.stub_open.files['product_path/config/bdk/version'] = f
- config.os.path.should_exist += ['product_path/config/bdk',
- 'product_path/config/bdk/version']
- expected = 'everything awesome'
- self.store.bdk.version = expected
- self.assertEqual(f.contents, [expected])
-
- def test_save_makedirs_fail(self):
- config.os.path.should_exist = []
- with self.assertRaises(OSError):
- self.store.bdk.version = 'dontmatter'
-
- def test_save_makedirs_works(self):
- f = stubs.StubFile()
- self.stub_open.files['product_path/config/bdk/version'] = f
- config.os.should_makedirs = ['product_path/config/bdk']
- expected = 'everything awesome'
- self.store.bdk.version = expected
- self.assertEqual(f.contents, [expected])
-
- def test_save_valid_buildtype(self):
- f = stubs.StubFile()
- self.stub_open.files['product_path/config/bdk/buildtype'] = f
- config.os.should_makedirs = ['product_path/config/bdk']
- expected = 'userdebug'
- with self.assertRaises(ValueError):
- self.store.bdk.buildtype = 'not legit'
- self.store.bdk.buildtype = expected
- self.assertEqual(f.contents, [expected])
-
- def test_save_load_comments(self):
- f = stubs.StubFile()
- self.stub_open.files['product_path/config/packages'] = f
- config.os.path.should_exist = ['product_path/config',
- 'product_path/config/packages']
- input_ = """
+ f = stubs.StubFile(input_)
+ self.stub_open.files['product_path/config/packages'] = f
+ config.os.path.should_exist = ['product_path/config',
+ 'product_path/config/packages']
+ self.assertEqual(self.store.packages, expected)
+ self.assertEqual(f.contents, string.split(input_, '\n'))
+
+ def test_load_invalid(self):
+ with self.assertRaises(AttributeError):
+ _ = self.store.no.real.thing
+
+ def test_save_exists(self):
+ f = stubs.StubFile()
+ self.stub_open.files['product_path/config/bdk/version'] = f
+ config.os.path.should_exist += ['product_path/config/bdk',
+ 'product_path/config/bdk/version']
+ expected = 'everything awesome'
+ self.store.bdk.version = expected
+ self.assertEqual(f.contents, [expected])
+
+ def test_save_makedirs_fail(self):
+ config.os.path.should_exist = []
+ with self.assertRaises(OSError):
+ self.store.bdk.version = 'dontmatter'
+
+ def test_save_makedirs_works(self):
+ f = stubs.StubFile()
+ self.stub_open.files['product_path/config/bdk/version'] = f
+ config.os.should_makedirs = ['product_path/config/bdk']
+ expected = 'everything awesome'
+ self.store.bdk.version = expected
+ self.assertEqual(f.contents, [expected])
+
+ def test_save_valid_buildtype(self):
+ f = stubs.StubFile()
+ self.stub_open.files['product_path/config/bdk/buildtype'] = f
+ config.os.should_makedirs = ['product_path/config/bdk']
+ expected = 'userdebug'
+ with self.assertRaises(ValueError):
+ self.store.bdk.buildtype = 'not legit'
+ self.store.bdk.buildtype = expected
+ self.assertEqual(f.contents, [expected])
+
+ def test_save_load_comments(self):
+ f = stubs.StubFile()
+ self.stub_open.files['product_path/config/packages'] = f
+ config.os.path.should_exist = ['product_path/config',
+ 'product_path/config/packages']
+ input_ = """
# Running service:
ledservice
# Debugging app
ledprobe
"""
- self.store.packages = input_
- self.assertEqual(string.join(f.contents, '\n'), input_)
- expected = 'ledservice ledprobe'
- self.assertEqual(self.store.packages, expected)
-
- def test_dict(self):
- f = stubs.StubFile()
- self.stub_open.files['product_path/config/packages'] = f
- config.os.path.should_exist = ['product_path/config',
- 'product_path/config/packages']
- input_ = """
+ self.store.packages = input_
+ self.assertEqual(string.join(f.contents, '\n'), input_)
+ expected = 'ledservice ledprobe'
+ self.assertEqual(self.store.packages, expected)
+
+ def test_dict(self):
+ f = stubs.StubFile()
+ self.stub_open.files['product_path/config/packages'] = f
+ config.os.path.should_exist = ['product_path/config',
+ 'product_path/config/packages']
+ input_ = """
# Running service:
ledservice
# Debugging app
ledprobe
"""
- self.store.packages = input_
- self.assertEqual(string.join(f.contents, '\n'), input_)
- expected = 'ledservice ledprobe'
- self.assertEqual(self.store.dict()['packages'], expected)
+ self.store.packages = input_
+ self.assertEqual(string.join(f.contents, '\n'), input_)
+ expected = 'ledservice ledprobe'
+ self.assertEqual(self.store.dict()['packages'], expected)
diff --git a/cli/lib/core/image_build.py b/cli/lib/core/image_build.py
index 873b6ee..4088f50 100644
--- a/cli/lib/core/image_build.py
+++ b/cli/lib/core/image_build.py
@@ -37,338 +37,344 @@ IMAGE_TYPES = [IMAGE_TYPE_ODM, IMAGE_TYPE_SYSTEM]
class Error(error.Error):
- """General build failure."""
+ """General build failure."""
class PathError(Error):
- """Raised when a provided path does not meet expectations."""
+ """Raised when a provided path does not meet expectations."""
class ImageTypeError(Error):
- """Raise when an unknown image type is seen."""
+ """Raise when an unknown image type is seen."""
def _CheckBuildImagePaths(cache_dir, output_dir, product_out, host_tools,
build_tools):
- """Checks that all paths necessary for an image build meet expectations.
-
- Does not validate the contents of any paths.
- Will create an empty output_dir if it doesn't already exist.
-
- Args:
- cache_dir: Directory where user files and system files are merged.
- (This is normally per-target, not the global cache dir.)
- output_dir: Directory where image build output should be placed.
- product_out: Directory where the built platform can be found.
- host_tools: Directory where host tools can be found.
- build_tools: Directory where build tools can be found.
-
- Raises:
- PathError: An expected directory cannot be found or isn't a dir.
- """
- if not os.path.isdir(cache_dir):
- raise PathError('Cache dir "{}" is not a directory.'.format(cache_dir))
- if os.path.isfile(output_dir):
- raise PathError('Can not create output dir "{}", is a file'.format(
- output_dir))
- if not os.path.isdir(product_out):
- raise PathError(
- 'Could not find platform build output "{}". '
- 'Use `bdk build platform` to build it.'.format(
- product_out))
- if not os.path.isdir(host_tools):
- # Probably tampered with/not built.
- raise PathError(
- 'Could not find host tools directory ({}). '
- 'Use `bdk build platform` to build it.'.format(host_tools))
- if not os.path.isdir(build_tools):
- # This should only be missing if it was tampered with.
- raise PathError(
- 'Could not find build tools directory ({}). '
- 'You may need to re-download the OS version.'.format(
- build_tools))
-
- if not os.path.isdir(output_dir):
- os.makedirs(output_dir)
+ """Checks that all paths necessary for an image build meet expectations.
+
+ Does not validate the contents of any paths.
+ Will create an empty output_dir if it doesn't already exist.
+
+ Args:
+ cache_dir: Directory where user files and system files are merged.
+ (This is normally per-target, not the global cache dir.)
+ output_dir: Directory where image build output should be placed.
+ product_out: Directory where the built platform can be found.
+ host_tools: Directory where host tools can be found.
+ build_tools: Directory where build tools can be found.
+
+ Raises:
+ PathError: An expected directory cannot be found or isn't a dir.
+ """
+ if not os.path.isdir(cache_dir):
+ raise PathError('Cache dir "{}" is not a directory.'.format(cache_dir))
+ if os.path.isfile(output_dir):
+ raise PathError('Can not create output dir "{}", is a file'.format(
+ output_dir))
+ if not os.path.isdir(product_out):
+ raise PathError(
+ 'Could not find platform build output "{}". '
+ 'Use `bdk build platform` to build it.'.format(
+ product_out))
+ if not os.path.isdir(host_tools):
+ # Probably tampered with/not built.
+ raise PathError(
+ 'Could not find host tools directory ({}). '
+ 'Use `bdk build platform` to build it.'.format(host_tools))
+ if not os.path.isdir(build_tools):
+ # This should only be missing if it was tampered with.
+ raise PathError(
+ 'Could not find build tools directory ({}). '
+ 'You may need to re-download the OS version.'.format(
+ build_tools))
+
+ if not os.path.isdir(output_dir):
+ os.makedirs(output_dir)
def AddTargetOsPacks(spec, target):
- """Adds target OS packs to the spec.
-
- Args:
- spec: project.project_spec.ProjectSpec to add the OS packs to.
- target: project.target.Target object to get OS info from.
- """
- platform_dir = target.platform_build_cache()
- product_out = util.GetAndroidProductOut(platform_dir, target.board)
- os_packs = packs.Packs()
- os_packs.namespace = target.os_namespace
- os_core = pack.Pack(os_packs.namespace, 'generated_system')
- os_core.add_provides('os.core')
- base_copy = pack.Copy(os_core)
- base_copy.src = os.path.join(product_out, 'system')
- base_copy.src_type = pack.CopyType.DIR
- base_copy.recurse = True
- base_copy.dst = '/system'
- base_copy.dst_type = pack.CopyType.DIR
- # Do not generate fs_config_* or file_context_* for this pack.
- base_copy.acl.override_build = False
- os_core.add_copy(base_copy)
- os_packs.add_pack(os_core)
- spec.add_packs(os_packs)
+ """Adds target OS packs to the spec.
+
+ Args:
+ spec: project.project_spec.ProjectSpec to add the OS packs to.
+ target: project.target.Target object to get OS info from.
+ """
+ platform_dir = target.platform_build_cache()
+ product_out = util.GetAndroidProductOut(platform_dir, target.board)
+ os_packs = packs.Packs()
+ os_packs.namespace = target.os_namespace
+ os_core = pack.Pack(os_packs.namespace, 'generated_system')
+ os_core.add_provides('os.core')
+ base_copy = pack.Copy(os_core)
+ base_copy.src = os.path.join(product_out, 'system')
+ base_copy.src_type = pack.CopyType.DIR
+ base_copy.recurse = True
+ base_copy.dst = '/system'
+ base_copy.dst_type = pack.CopyType.DIR
+ # Do not generate fs_config_* or file_context_* for this pack.
+ base_copy.acl.override_build = False
+ os_core.add_copy(base_copy)
+ os_packs.add_pack(os_core)
+ spec.add_packs(os_packs)
def CreateTargetCache(spec, target, cache_dir, mountpoint='/', update=True,
verbose=True):
- """Copies all files specified in the target to a single cache dir.
-
- This call will create a submap from the parent spec and then use the
- resulting copy destinations to populate the supplied path as if it were
- the root of the destination.
-
- In the future, non-os and non-board packs will likely end up in their
- own image (not /system), so this will create that cache ignoring any
- Copy() nodes owned by a Pack that is prefixed by the target os or board.
-
- Args:
- spec: project.ProjectSpec global specification
- target: project.target.Target to cache
- cache_dir: path to place the root/ of copied files.
- mountpoint: optional path prefix to exclusively cache.
- update: optionally, only replaces files that have changed.
- verbose: (optional). If True, print information about what's
- happening. Default True.
-
- Returns:
- dict of uncached { dst_path => copy }
-
- Raises:
- project.dependency.Error: If there is an unfulfilled dependency.
- PathError: If a required path is missing.
- """
- target_map = target.create_submap(spec.packmap)
-
- cache_root = os.path.join(cache_dir, 'root')
- cache = sysroot.Sysroot(cache_root, copy_newer_only=update)
- fs_config_files = {}
- fs_config_dirs = {}
- file_context = set()
- uncached = {}
-
- # pack.Copy._reconcile_paths() *should* make these always what is seen here.
- # TODO(wad): Add a <link> node.
- allowed_copies = [
- (pack.CopyType.FILE, pack.CopyType.FILE, False),
- (pack.CopyType.GLOB, pack.CopyType.DIR, False),
- (pack.CopyType.GLOB, pack.CopyType.DIR, True),
- (pack.CopyType.DIR, pack.CopyType.DIR, False),
- (pack.CopyType.DIR, pack.CopyType.DIR, True),
- ]
-
- final_files = {}
- for destination, copy in target_map.copy_destinations.iteritems():
- copy = copy[0] # deduping has already occurred.
-
- # Currently, skip unknown mountpoints.
- if not destination.startswith(mountpoint):
- uncached[destination] = copy
- continue
-
- # All destinations are required to be absolute.
- # Make them relative so they play nicely with Sysroot().
- destination = destination.lstrip('/')
-
- # Proactively check allowed combinations so that the error
- # handling isn't interleaved with logic.
- if (copy.src_type, copy.dst_type, copy.recurse) not in allowed_copies:
- # TODO(wad): Should these be FileABugError()s?
- raise PathError(
- '{}: impossible copy reached: {}'.format(copy.pack.origin, copy))
-
- # TODO(arihc): Will os packs ever draw in files from BSPs
- # (other than build output files)? If so, need to link BSP to OS.
- files = set()
- try:
- if copy.src_type == pack.CopyType.FILE: # FILE -> FILE
- cache.Makedirs(os.path.dirname(destination))
- if os.path.islink(copy.src):
- # TODO(wad): Resolve how to skip permission setting on symlinks.
- cache.AddSymlink(copy.src, destination)
- else:
- cache.AddFile(copy.src, destination)
- files.add(destination)
- elif copy.src_type == pack.CopyType.GLOB: # GLOB -> DIR
- files.update(cache.AddGlob(copy.src, destination, recurse=copy.recurse))
- elif copy.src_type == pack.CopyType.DIR: # DIR -> DIR
- files.update(cache.AddDir(copy.src, destination,
- recurse=copy.recurse, symlinks=True))
- fs_config_dirs[destination] = copy.acl.fs_config(binary=True)
- for f in files:
- # Even though the fs_config_files format support globs, the final list
- # of files is known at this point.
- fs_config_files[f] = copy.acl.fs_config(path=f, binary=True)
- if copy.acl.selabel:
- file_context.update([copy.acl.file_context(path=f)])
- final_files[os.path.sep + f] = copy
-
- except IOError as e:
- # Annotate the error with the line that where the <copy> is defined.
- raise PathError('{}: {}'.format(copy.origin, e))
-
- # Walk the cache tree and remove any unmanaged dirs. First
- # we compute all allowed subpaths from the list of copied files, then
- # walk the cache tree and remove any unknown files and directories.
- final_dirs = set('/')
- for dst, copy in final_files.iteritems():
- # Final files is always a full path so let's drop the file.
- dst = os.path.dirname(dst)
- while dst != os.path.sep:
- # TODO(b/27848879) Add <dir> node set-acl support.
- tmp_acl = pack.Copy(None, dst).acl
- tmp_acl.perms = '0555' # Default to rx for dirs.
- tmp_acl.override_build = copy.acl.override_build
- if dst not in fs_config_dirs:
- fs_config_dirs[dst] = '{}'.format(tmp_acl.fs_config(binary=True))
- # The default root map covers file_context for now.
- final_dirs.add(dst)
- dst = os.path.dirname(dst)
-
- all_paths = set()
- all_files = set()
- # Walk the tree as quickly as possible with os.walk.
- # Use set differencing to compute the unmanaged entries.
- for root, _, files in os.walk(cache_root):
- dst_path = root[len(cache_root):] or os.path.sep
- # Don't touch unmanaged files outside of the given mountpoint.
- if not dst_path.startswith(mountpoint):
- continue
- all_paths.add(dst_path)
- all_files.update(set([os.path.join(dst_path, f) for f in files]))
- unmanaged_file = all_files.difference(final_files.keys())
- unmanaged_path = all_paths.difference(final_dirs)
-
- if verbose and (len(unmanaged_file) or len(unmanaged_path)):
- print 'Removing unmanaged paths:\n {}'.format(
- '\n '.join(sorted(unmanaged_file.union(unmanaged_path))))
- for f in unmanaged_file:
- os.remove(os.path.join(cache_root, f.lstrip(os.path.sep)))
- for p in reversed(sorted(unmanaged_path)):
- os.rmdir(os.path.join(cache_root, p.lstrip(os.path.sep)))
-
- # Sort the list from most specific to least specific.
- dsts = sorted(fs_config_dirs.keys(), key=lambda k: k.count(os.path.sep),
- reverse=True)
- sorted_dirs = list()
- for dst in dsts:
- sorted_dirs.append(fs_config_dirs[dst])
-
- # The sysroot was nestled one deep at root/, so meta files
- # can be stored in the parent without a risk of conflict.
- if not os.path.isdir(os.path.join(cache_dir, 'etc')):
- os.makedirs(os.path.join(cache_dir, 'etc'))
- with open(os.path.join(cache_dir, 'etc', 'fs_config_files'), 'w') as f:
- f.write(''.join(fs_config_files.values()))
- with open(os.path.join(cache_dir, 'etc', 'fs_config_dirs'), 'w') as f:
- f.write(''.join(sorted_dirs))
- with open(os.path.join(cache_dir, 'file_contexts'), 'w') as f:
- f.write('\n'.join(file_context))
-
- return uncached
+ """Copies all files specified in the target to a single cache dir.
+
+ This call will create a submap from the parent spec and then use the
+ resulting copy destinations to populate the supplied path as if it were
+ the root of the destination.
+
+ In the future, non-os and non-board packs will likely end up in their
+ own image (not /system), so this will create that cache ignoring any
+ Copy() nodes owned by a Pack that is prefixed by the target os or board.
+
+ Args:
+ spec: project.ProjectSpec global specification
+ target: project.target.Target to cache
+ cache_dir: path to place the root/ of copied files.
+ mountpoint: optional path prefix to exclusively cache.
+ update: optionally, only replaces files that have changed.
+ verbose: (optional). If True, print information about what's
+ happening. Default True.
+
+ Returns:
+ dict of uncached { dst_path => copy }
+
+ Raises:
+ project.dependency.Error: If there is an unfulfilled dependency.
+ PathError: If a required path is missing.
+ """
+ target_map = target.create_submap(spec.packmap)
+
+ cache_root = os.path.join(cache_dir, 'root')
+ cache = sysroot.Sysroot(cache_root, copy_newer_only=update)
+ fs_config_files = {}
+ fs_config_dirs = {}
+ file_context = set()
+ uncached = {}
+
+ # pack.Copy._reconcile_paths() *should* make these always what is seen here.
+ # TODO(wad): Add a <link> node.
+ allowed_copies = [
+ (pack.CopyType.FILE, pack.CopyType.FILE, False),
+ (pack.CopyType.GLOB, pack.CopyType.DIR, False),
+ (pack.CopyType.GLOB, pack.CopyType.DIR, True),
+ (pack.CopyType.DIR, pack.CopyType.DIR, False),
+ (pack.CopyType.DIR, pack.CopyType.DIR, True),
+ ]
+
+ final_files = {}
+ for destination, copy in target_map.copy_destinations.iteritems():
+ copy = copy[0] # deduping has already occurred.
+
+ # Currently, skip unknown mountpoints.
+ if not destination.startswith(mountpoint):
+ uncached[destination] = copy
+ continue
+
+ # All destinations are required to be absolute.
+ # Make them relative so they play nicely with Sysroot().
+ destination = destination.lstrip('/')
+
+ # Proactively check allowed combinations so that the error
+ # handling isn't interleaved with logic.
+ if (copy.src_type, copy.dst_type, copy.recurse) not in allowed_copies:
+ # TODO(wad): Should these be FileABugError()s?
+ raise PathError(
+ '{}: impossible copy reached: {}'.format(copy.pack.origin,
+ copy))
+
+ # TODO(arihc): Will os packs ever draw in files from BSPs
+ # (other than build output files)? If so, need to link BSP to OS.
+ files = set()
+ try:
+ if copy.src_type == pack.CopyType.FILE: # FILE -> FILE
+ cache.Makedirs(os.path.dirname(destination))
+ if os.path.islink(copy.src):
+ # TODO(wad): Resolve how to skip permission setting on
+ # symlinks.
+ cache.AddSymlink(copy.src, destination)
+ else:
+ cache.AddFile(copy.src, destination)
+ files.add(destination)
+ elif copy.src_type == pack.CopyType.GLOB: # GLOB -> DIR
+ files.update(cache.AddGlob(copy.src, destination,
+ recurse=copy.recurse))
+ elif copy.src_type == pack.CopyType.DIR: # DIR -> DIR
+ files.update(cache.AddDir(copy.src, destination,
+ recurse=copy.recurse, symlinks=True))
+ fs_config_dirs[destination] = copy.acl.fs_config(binary=True)
+ for f in files:
+ # Even though the fs_config_files format support globs, the
+ # final list of files is known at this point.
+ fs_config_files[f] = copy.acl.fs_config(path=f, binary=True)
+ if copy.acl.selabel:
+ file_context.update([copy.acl.file_context(path=f)])
+ final_files[os.path.sep + f] = copy
+
+ except IOError as e:
+ # Annotate the error with the line that where the <copy> is defined.
+ raise PathError('{}: {}'.format(copy.origin, e))
+
+ # Walk the cache tree and remove any unmanaged dirs. First
+ # we compute all allowed subpaths from the list of copied files, then
+ # walk the cache tree and remove any unknown files and directories.
+ final_dirs = set('/')
+ for dst, copy in final_files.iteritems():
+ # Final files is always a full path so let's drop the file.
+ dst = os.path.dirname(dst)
+ while dst != os.path.sep:
+ # TODO(b/27848879) Add <dir> node set-acl support.
+ tmp_acl = pack.Copy(None, dst).acl
+ tmp_acl.perms = '0555' # Default to rx for dirs.
+ tmp_acl.override_build = copy.acl.override_build
+ if dst not in fs_config_dirs:
+ fs_config_dirs[dst] = '{}'.format(
+ tmp_acl.fs_config(binary=True))
+ # The default root map covers file_context for now.
+ final_dirs.add(dst)
+ dst = os.path.dirname(dst)
+
+ all_paths = set()
+ all_files = set()
+ # Walk the tree as quickly as possible with os.walk.
+ # Use set differencing to compute the unmanaged entries.
+ for root, _, files in os.walk(cache_root):
+ dst_path = root[len(cache_root):] or os.path.sep
+ # Don't touch unmanaged files outside of the given mountpoint.
+ if not dst_path.startswith(mountpoint):
+ continue
+ all_paths.add(dst_path)
+ all_files.update(set([os.path.join(dst_path, f) for f in files]))
+ unmanaged_file = all_files.difference(final_files.keys())
+ unmanaged_path = all_paths.difference(final_dirs)
+
+ if verbose and (len(unmanaged_file) or len(unmanaged_path)):
+ print 'Removing unmanaged paths:\n {}'.format(
+ '\n '.join(sorted(unmanaged_file.union(unmanaged_path))))
+ for f in unmanaged_file:
+ os.remove(os.path.join(cache_root, f.lstrip(os.path.sep)))
+ for p in reversed(sorted(unmanaged_path)):
+ os.rmdir(os.path.join(cache_root, p.lstrip(os.path.sep)))
+
+ # Sort the list from most specific to least specific.
+ dsts = sorted(fs_config_dirs.keys(), key=lambda k: k.count(os.path.sep),
+ reverse=True)
+ sorted_dirs = list()
+ for dst in dsts:
+ sorted_dirs.append(fs_config_dirs[dst])
+
+ # The sysroot was nestled one deep at root/, so meta files
+ # can be stored in the parent without a risk of conflict.
+ if not os.path.isdir(os.path.join(cache_dir, 'etc')):
+ os.makedirs(os.path.join(cache_dir, 'etc'))
+ with open(os.path.join(cache_dir, 'etc', 'fs_config_files'), 'w') as f:
+ f.write(''.join(fs_config_files.values()))
+ with open(os.path.join(cache_dir, 'etc', 'fs_config_dirs'), 'w') as f:
+ f.write(''.join(sorted_dirs))
+ with open(os.path.join(cache_dir, 'file_contexts'), 'w') as f:
+ f.write('\n'.join(file_context))
+
+ return uncached
def _CreateBuildProps(build_root, product_out, image_type, info_file):
- if image_type == IMAGE_TYPE_ODM:
- build_props = {
- 'mount_point': 'odm',
- 'fs_type': 'ext4',
- # TODO(b/27854052): Get the size in here.
- 'partition_size': '134217728',
- 'extfs_sparse_flag': '-s',
- 'skip_fsck': 'true',
- 'selinux_fc': build_root.Path('root', 'file_contexts.bin'),
- }
- build_root.WriteFile(info_file, '\n'.join(
- ['{}={}'.format(k, v) for k, v in build_props.iteritems()]))
- elif image_type == IMAGE_TYPE_SYSTEM:
- # Add and update a copy of system_image_info.txt
- build_root.AddFile(os.path.join(product_out,
- 'obj',
- 'PACKAGING',
- 'systemimage_intermediates',
- 'system_image_info.txt'), info_file)
- for line in fileinput.input(build_root.Path(info_file), inplace=True):
- # We just want to change the SELinux file contexts.
- if line.startswith('selinux_fc'):
- line = 'selinux_fc=' + build_root.Path('root', 'file_contexts.bin')
- print line.rstrip()
- else:
- build_root.WriteFile(info_file, 'mount_point={}'.format(image_type))
+ if image_type == IMAGE_TYPE_ODM:
+ build_props = {
+ 'mount_point': 'odm',
+ 'fs_type': 'ext4',
+ # TODO(b/27854052): Get the size in here.
+ 'partition_size': '134217728',
+ 'extfs_sparse_flag': '-s',
+ 'skip_fsck': 'true',
+ 'selinux_fc': build_root.Path('root', 'file_contexts.bin'),
+ }
+ build_root.WriteFile(info_file, '\n'.join(
+ ['{}={}'.format(k, v) for k, v in build_props.iteritems()]))
+ elif image_type == IMAGE_TYPE_SYSTEM:
+ # Add and update a copy of system_image_info.txt
+ build_root.AddFile(os.path.join(product_out,
+ 'obj',
+ 'PACKAGING',
+ 'systemimage_intermediates',
+ 'system_image_info.txt'), info_file)
+ for line in fileinput.input(build_root.Path(info_file), inplace=True):
+ # We just want to change the SELinux file contexts.
+ if line.startswith('selinux_fc'):
+ line = 'selinux_fc=' + build_root.Path('root',
+ 'file_contexts.bin')
+ print line.rstrip()
+ else:
+ build_root.WriteFile(info_file, 'mount_point={}'.format(image_type))
def BuildImage(image_type, target, cache_dir, output_dir):
- """Builds the image specified by image_type.
-
- Caller should validate target OS before calling.
-
- Args:
- image_type: The type of the image. One of IMAGE_TYPES.
- target: project.target.Target to build an image of.
- cache_dir: Directory to build the image in. This path
- may be prepared by prior callers. E.g., CreateTargetCache().
- output_dir: Directory to place built .img file in.
-
- Returns:
- The image build exit code.
-
- Raises:
- PathError: An expected directory cannot be found or isn't a dir.
- util.HostUnsupportedArchError: The host is an unsupported architecture.
- ImageTypeError: An invalid parameter value has been supplied for image_type.
- """
- if image_type not in IMAGE_TYPES:
- raise ImageTypeError('image_type must be one of {}: {}'.format(
- IMAGE_TYPES, image_type))
- # Set some useful variables.
- build_tools = util.GetOSPath(target.os_version,
- 'build', 'tools', 'releasetools')
- platform_out = target.platform_build_cache()
- product_out = util.GetAndroidProductOut(platform_out, target.board)
- host_arch = util.GetHostArch()
- host_tools = os.path.join(platform_out, 'host', host_arch, 'bin')
- output_file = os.path.join(output_dir, '{}.img'.format(image_type))
- ret = 1
-
- # Check that all required inputs and tools exist.
- _CheckBuildImagePaths(cache_dir, output_dir, product_out, host_tools,
- build_tools)
- # Check to ensure there are entries in root/image-mountpoint.
- if not os.path.exists(os.path.join(cache_dir, 'root', image_type)):
- print ('Warning: The specification does not define any destinations '
- 'for this image type: {}'.format(image_type))
-
- build_root = sysroot.Sysroot(cache_dir, copy_newer_only=True)
-
- # Build 'filecontexts.bin' and 'sepolicy' SELinux files.
- with target.get_device().linked(target.os_version):
- policy.BuildSepolicy(target, platform_out, cache_dir)
- policy.BuildFileContexts(target, platform_out, cache_dir)
-
- _CreateBuildProps(build_root, product_out, image_type, 'image_info.txt')
-
- # Build an image from the build root.
- additional_path = host_tools + os.pathsep + build_tools
- if 'PATH' in os.environ:
- os.environ['PATH'] += os.pathsep + additional_path
- else:
- os.environ['PATH'] = additional_path
-
- # Repoint product out so that changes made locally are reflected
- # for the build tooling. This primarily enables the discovery
- # of etc/fs_config_{files, dirs}.
- product_out = build_root.Path()
- ret = subprocess.call(['build_image.py',
- build_root.Path('root', image_type),
- build_root.Path('image_info.txt'),
- output_file,
- product_out])
- return ret
+ """Builds the image specified by image_type.
+
+ Caller should validate target OS before calling.
+
+ Args:
+ image_type: The type of the image. One of IMAGE_TYPES.
+ target: project.target.Target to build an image of.
+ cache_dir: Directory to build the image in. This path
+ may be prepared by prior callers. E.g., CreateTargetCache().
+ output_dir: Directory to place built .img file in.
+
+ Returns:
+ The image build exit code.
+
+ Raises:
+ PathError: An expected directory cannot be found or isn't a dir.
+ util.HostUnsupportedArchError: The host is an unsupported architecture.
+ ImageTypeError: An invalid parameter value has been supplied for
+ image_type.
+ """
+ if image_type not in IMAGE_TYPES:
+ raise ImageTypeError('image_type must be one of {}: {}'.format(
+ IMAGE_TYPES, image_type))
+ # Set some useful variables.
+ build_tools = util.GetOSPath(target.os_version,
+ 'build', 'tools', 'releasetools')
+ platform_out = target.platform_build_cache()
+ product_out = util.GetAndroidProductOut(platform_out, target.board)
+ host_arch = util.GetHostArch()
+ host_tools = os.path.join(platform_out, 'host', host_arch, 'bin')
+ output_file = os.path.join(output_dir, '{}.img'.format(image_type))
+ ret = 1
+
+ # Check that all required inputs and tools exist.
+ _CheckBuildImagePaths(cache_dir, output_dir, product_out, host_tools,
+ build_tools)
+ # Check to ensure there are entries in root/image-mountpoint.
+ if not os.path.exists(os.path.join(cache_dir, 'root', image_type)):
+ print ('Warning: The specification does not define any destinations '
+ 'for this image type: {}'.format(image_type))
+
+ build_root = sysroot.Sysroot(cache_dir, copy_newer_only=True)
+
+ # Build 'filecontexts.bin' and 'sepolicy' SELinux files.
+ with target.get_device().linked(target.os_version):
+ policy.BuildSepolicy(target, platform_out, cache_dir)
+ policy.BuildFileContexts(target, platform_out, cache_dir)
+
+ _CreateBuildProps(build_root, product_out, image_type, 'image_info.txt')
+
+ # Build an image from the build root.
+ additional_path = host_tools + os.pathsep + build_tools
+ if 'PATH' in os.environ:
+ os.environ['PATH'] += os.pathsep + additional_path
+ else:
+ os.environ['PATH'] = additional_path
+
+ # Repoint product out so that changes made locally are reflected
+ # for the build tooling. This primarily enables the discovery
+ # of etc/fs_config_{files, dirs}.
+ product_out = build_root.Path()
+ ret = subprocess.call(['build_image.py',
+ build_root.Path('root', image_type),
+ build_root.Path('image_info.txt'),
+ output_file,
+ product_out])
+ return ret
diff --git a/cli/lib/core/image_build_unittest.py b/cli/lib/core/image_build_unittest.py
index 3803845..d6b7782 100644
--- a/cli/lib/core/image_build_unittest.py
+++ b/cli/lib/core/image_build_unittest.py
@@ -35,477 +35,483 @@ from selinux import policy_stub
class TestData(object):
- _BSP = 'test_board'
- _BSP_VERSION = '0.1'
- _BUILD_TYPE = 'user'
- _BRILLO_VERSION = '9.9'
- _CACHE_DIR = '/product/artifacts'
- _PLATFORM_DIR = '/foo/bar'
- _IMAGE_OUT_DIR = '/baz/pop'
+ _BSP = 'test_board'
+ _BSP_VERSION = '0.1'
+ _BUILD_TYPE = 'user'
+ _BRILLO_VERSION = '9.9'
+ _CACHE_DIR = '/product/artifacts'
+ _PLATFORM_DIR = '/foo/bar'
+ _IMAGE_OUT_DIR = '/baz/pop'
class BuildImageBase(TestData):
- """Base image unit test setup.
-
- Separate the setup from the default tests, so that BuildUnsupportedImageTest()
- can instantiate this object in its test.
- """
-
- def setUp(self):
- self.stub_os = stubs.StubOs()
-
- self.stub_fileinput = stubs.StubFileinput()
- self.stub_open = stubs.StubOpen(self.stub_os)
- self.stub_selinux = policy_stub.StubPolicy()
- self.stub_subprocess = stubs.StubSubprocess()
- self.stub_sysroot_generator = sysroot_stub.StubSysrootGenerator()
- self.stub_util = util_stub.StubUtil(os_version=self._BRILLO_VERSION)
-
- image_build.fileinput = self.stub_fileinput
- image_build.os = self.stub_os
- image_build.open = self.stub_open.open
- image_build.policy = self.stub_selinux
- image_build.subprocess = self.stub_subprocess
- image_build.sysroot = self.stub_sysroot_generator
- image_build.util = self.stub_util
-
- self.device = device_stub.StubDevice(
- should_link_version=self._BRILLO_VERSION)
- self.target = target_stub.StubTarget(
- board=self._BSP, os_version=self._BRILLO_VERSION, device=self.device)
- self.target.platform_dir = self._PLATFORM_DIR
-
- self.product_out_dir = None
- self.image_type = None
-
- def SetupForImageImage_Build(self, image_type='subclassme'):
- """Helper: establishes pre-reqs for successful image image_build.
-
- Returns the expected subprocess call.
+ """Base image unit test setup.
+
+ Separate the setup from the default tests, so that
+ BuildUnsupportedImageTest() can instantiate this object in its test.
"""
- if image_type is 'subclassme':
- raise unittest.SkipTest('not in subclass')
- self.image_type = image_type
-
- # Must be linux
- self.stub_os.SetUname(('linux', '', '', '', ''))
-
- # Must have copy dir, product out dir, image out dir,
- # host tools, and image_build tools.
- platform_dir = self.target.platform_build_cache()
- product_out_dir = self.stub_util.GetAndroidProductOut(
- platform_dir, self.target.board)
- self.product_out_dir = product_out_dir
- self.stub_os.path.should_be_dir = [
- product_out_dir, self._IMAGE_OUT_DIR, self._CACHE_DIR,
- self.target.platform_build_cache('host',
- self.stub_util.GetHostArch(), 'bin'),
- self.stub_util.GetOSPath(self._BRILLO_VERSION,
- 'build', 'tools', 'releasetools')]
- self.stub_os.path.should_exist += copy.deepcopy(
- self.stub_os.path.should_be_dir)
-
- # Willing to make an image_build root in the cache dir.
- self.stub_os.should_makedirs = [self._CACHE_DIR]
- # Image_Build root should get system_image_info.txt, and modify it.
- self.stub_fileinput.should_touch = [self.stub_os.path.join(
- self._CACHE_DIR, 'image_info.txt')]
-
- expected_call = [
- 'build_image.py',
- self.stub_os.path.join(self._CACHE_DIR, 'root', image_type),
- self.stub_os.path.join(self._CACHE_DIR, 'image_info.txt'),
- self.stub_os.path.join(
- self._IMAGE_OUT_DIR, '{}.img'.format(image_type)),
- self._CACHE_DIR]
-
- return expected_call
+
+ def setUp(self):
+ self.stub_os = stubs.StubOs()
+
+ self.stub_fileinput = stubs.StubFileinput()
+ self.stub_open = stubs.StubOpen(self.stub_os)
+ self.stub_selinux = policy_stub.StubPolicy()
+ self.stub_subprocess = stubs.StubSubprocess()
+ self.stub_sysroot_generator = sysroot_stub.StubSysrootGenerator()
+ self.stub_util = util_stub.StubUtil(os_version=self._BRILLO_VERSION)
+
+ image_build.fileinput = self.stub_fileinput
+ image_build.os = self.stub_os
+ image_build.open = self.stub_open.open
+ image_build.policy = self.stub_selinux
+ image_build.subprocess = self.stub_subprocess
+ image_build.sysroot = self.stub_sysroot_generator
+ image_build.util = self.stub_util
+
+ self.device = device_stub.StubDevice(
+ should_link_version=self._BRILLO_VERSION)
+ self.target = target_stub.StubTarget(
+ board=self._BSP, os_version=self._BRILLO_VERSION,
+ device=self.device)
+ self.target.platform_dir = self._PLATFORM_DIR
+
+ self.product_out_dir = None
+ self.image_type = None
+
+ def SetupForImageImage_Build(self, image_type='subclassme'):
+ """Helper: establishes pre-reqs for successful image image_build.
+
+ Returns the expected subprocess call.
+ """
+ if image_type is 'subclassme':
+ raise unittest.SkipTest('not in subclass')
+ self.image_type = image_type
+
+ # Must be linux
+ self.stub_os.SetUname(('linux', '', '', '', ''))
+
+ # Must have copy dir, product out dir, image out dir,
+ # host tools, and image_build tools.
+ platform_dir = self.target.platform_build_cache()
+ product_out_dir = self.stub_util.GetAndroidProductOut(
+ platform_dir, self.target.board)
+ self.product_out_dir = product_out_dir
+ self.stub_os.path.should_be_dir = [
+ product_out_dir, self._IMAGE_OUT_DIR, self._CACHE_DIR,
+ self.target.platform_build_cache('host',
+ self.stub_util.GetHostArch(),
+ 'bin'),
+ self.stub_util.GetOSPath(self._BRILLO_VERSION,
+ 'build', 'tools', 'releasetools')]
+ self.stub_os.path.should_exist += copy.deepcopy(
+ self.stub_os.path.should_be_dir)
+
+ # Willing to make an image_build root in the cache dir.
+ self.stub_os.should_makedirs = [self._CACHE_DIR]
+ # Image_Build root should get system_image_info.txt, and modify it.
+ self.stub_fileinput.should_touch = [self.stub_os.path.join(
+ self._CACHE_DIR, 'image_info.txt')]
+
+ expected_call = [
+ 'build_image.py',
+ self.stub_os.path.join(self._CACHE_DIR, 'root', image_type),
+ self.stub_os.path.join(self._CACHE_DIR, 'image_info.txt'),
+ self.stub_os.path.join(
+ self._IMAGE_OUT_DIR, '{}.img'.format(image_type)),
+ self._CACHE_DIR]
+
+ return expected_call
class BaseTests(object):
- """Container for the base image tests.
-
- Put the base test class within a subclass. Otherwise the unit test
- discover function picks them up as actual tests but marks them as skipped.
- """
-
- class BuildImageBaseTest(BuildImageBase, unittest.TestCase):
- """Base set of tests to run for each image type."""
-
- def test_image_success(self):
- expected_call = self.SetupForImageImage_Build()
- command = self.stub_subprocess.AddCommand()
- self.assertEqual(
- 0,
- image_build.BuildImage(self.image_type, self.target,
- self._CACHE_DIR,
- self._IMAGE_OUT_DIR))
-
- command.AssertCallWas(expected_call)
-
- def test_image_return_exit_code(self):
- """Tests that the make exit code is returned."""
- self.stub_subprocess.AddCommand(ret_code=1)
- self.SetupForImageImage_Build()
- self.assertEqual(
- 1,
- image_build.BuildImage(self.image_type, self.target,
- self._CACHE_DIR,
- self._IMAGE_OUT_DIR))
-
- def test_image_missing_paths(self):
- self.SetupForImageImage_Build()
- original_dirs = copy.deepcopy(self.stub_os.path.should_be_dir)
- for directory in original_dirs:
- self.stub_subprocess.AddCommand()
- # Take a dir out.
- self.stub_os.path.should_be_dir.remove(directory)
- self.stub_os.path.should_exist.remove(directory)
- # Check that we fail on all except the output dir.
- if directory != self._IMAGE_OUT_DIR:
- with self.assertRaises(image_build.PathError):
- image_build.BuildImage(
- self.image_type, self.target,
- self._CACHE_DIR, self._IMAGE_OUT_DIR)
- else:
- self.stub_os.should_makedirs.append(self._IMAGE_OUT_DIR)
- self.assertEqual(
- 0,
- image_build.BuildImage(self.image_type, self.target,
- self._CACHE_DIR,
- self._IMAGE_OUT_DIR))
- # Put it back in.
- self.stub_os.path.should_be_dir.append(directory)
-
- def test_image_bad_host(self):
- self.SetupForImageImage_Build()
- self.stub_util.arch_is_supported = False
- self.stub_subprocess.AddCommand()
- # Shouldn't reach the linking point.
- self.device.should_link_version = None
- with self.assertRaises(self.stub_util.HostUnsupportedArchError):
- image_build.BuildImage(self.image_type, self.target,
- self._CACHE_DIR, self._IMAGE_OUT_DIR)
+ """Container for the base image tests.
+
+ Put the base test class within a subclass. Otherwise the unit test
+ discover function picks them up as actual tests but marks them as skipped.
+ """
+
+ class BuildImageBaseTest(BuildImageBase, unittest.TestCase):
+ """Base set of tests to run for each image type."""
+
+ def test_image_success(self):
+ expected_call = self.SetupForImageImage_Build()
+ command = self.stub_subprocess.AddCommand()
+ self.assertEqual(
+ 0,
+ image_build.BuildImage(self.image_type, self.target,
+ self._CACHE_DIR, self._IMAGE_OUT_DIR))
+
+ command.AssertCallWas(expected_call)
+
+ def test_image_return_exit_code(self):
+ """Tests that the make exit code is returned."""
+ self.stub_subprocess.AddCommand(ret_code=1)
+ self.SetupForImageImage_Build()
+ self.assertEqual(
+ 1,
+ image_build.BuildImage(self.image_type, self.target,
+ self._CACHE_DIR, self._IMAGE_OUT_DIR))
+
+ def test_image_missing_paths(self):
+ self.SetupForImageImage_Build()
+ original_dirs = copy.deepcopy(self.stub_os.path.should_be_dir)
+ for directory in original_dirs:
+ self.stub_subprocess.AddCommand()
+ # Take a dir out.
+ self.stub_os.path.should_be_dir.remove(directory)
+ self.stub_os.path.should_exist.remove(directory)
+ # Check that we fail on all except the output dir.
+ if directory != self._IMAGE_OUT_DIR:
+ with self.assertRaises(image_build.PathError):
+ image_build.BuildImage(
+ self.image_type, self.target,
+ self._CACHE_DIR, self._IMAGE_OUT_DIR)
+ else:
+ self.stub_os.should_makedirs.append(self._IMAGE_OUT_DIR)
+ self.assertEqual(
+ 0,
+ image_build.BuildImage(self.image_type, self.target,
+ self._CACHE_DIR,
+ self._IMAGE_OUT_DIR))
+ # Put it back in.
+ self.stub_os.path.should_be_dir.append(directory)
+
+ def test_image_bad_host(self):
+ self.SetupForImageImage_Build()
+ self.stub_util.arch_is_supported = False
+ self.stub_subprocess.AddCommand()
+ # Shouldn't reach the linking point.
+ self.device.should_link_version = None
+ with self.assertRaises(self.stub_util.HostUnsupportedArchError):
+ image_build.BuildImage(self.image_type, self.target,
+ self._CACHE_DIR, self._IMAGE_OUT_DIR)
class BuildUnsupportedImageTest(unittest.TestCase):
- def test_unsupported_type(self):
- bt = BuildImageBase()
- bt.setUp()
- bt.SetupForImageImage_Build('unknown')
- with self.assertRaises(image_build.ImageTypeError):
- # pylint: disable=protected-access
- image_build.BuildImage(bt.image_type, bt.target,
- bt._CACHE_DIR, bt._IMAGE_OUT_DIR)
+ def test_unsupported_type(self):
+ bt = BuildImageBase()
+ bt.setUp()
+ bt.SetupForImageImage_Build('unknown')
+ with self.assertRaises(image_build.ImageTypeError):
+ # pylint: disable=protected-access
+ image_build.BuildImage(bt.image_type, bt.target,
+ bt._CACHE_DIR, bt._IMAGE_OUT_DIR)
class BuildOdmImageTest(BaseTests.BuildImageBaseTest):
- def SetupForImageImage_Build(self, image_type='odm'):
- """Helper: establishes pre-reqs for successful image image_build.
+ def SetupForImageImage_Build(self, image_type='odm'):
+ """Helper: establishes pre-reqs for successful image image_build.
- Returns the expected subprocess call.
- """
- call = super(BuildOdmImageTest, self).SetupForImageImage_Build('odm')
- self.stub_sysroot_generator.should_write += ['image_info.txt']
- return call
+ Returns the expected subprocess call.
+ """
+ call = super(BuildOdmImageTest, self).SetupForImageImage_Build('odm')
+ self.stub_sysroot_generator.should_write += ['image_info.txt']
+ return call
class BuildSystemImageTest(BaseTests.BuildImageBaseTest):
- def SetupForImageImage_Build(self, image_type='system'):
- """Helper: establishes pre-reqs for successful image image_build.
-
- Returns the expected subprocess call.
- """
- expected_call = super(BuildSystemImageTest, self).SetupForImageImage_Build(
- 'system')
- # Make sure the built system is copied to system.
- self.stub_sysroot_generator.should_add_dir += [
- (self.stub_os.path.join(self.product_out_dir, 'system'),
- 'root/system', True)
- ]
- # Image_Build root should get system_image_info.txt, and modify it.
- self.stub_sysroot_generator.should_add_file = [
- (self.stub_os.path.join(self.product_out_dir, 'obj', 'PACKAGING',
- 'systemimage_intermediates',
- 'system_image_info.txt'), 'image_info.txt')]
- return expected_call
+ def SetupForImageImage_Build(self, image_type='system'):
+ """Helper: establishes pre-reqs for successful image image_build.
+
+ Returns the expected subprocess call.
+ """
+ expected_call = super(BuildSystemImageTest,
+ self).SetupForImageImage_Build('system')
+ # Make sure the built system is copied to system.
+ self.stub_sysroot_generator.should_add_dir += [
+ (self.stub_os.path.join(self.product_out_dir, 'system'),
+ 'root/system', True)
+ ]
+ # Image_Build root should get system_image_info.txt, and modify it.
+ self.stub_sysroot_generator.should_add_file = [
+ (self.stub_os.path.join(self.product_out_dir, 'obj',
+ 'PACKAGING',
+ 'systemimage_intermediates',
+ 'system_image_info.txt'),
+ 'image_info.txt')]
+ return expected_call
class CreateTargetCacheTest(TestData, unittest.TestCase):
- def setUp(self):
- self.spec = project_spec_stub.StubProjectSpec()
- self.target = None
- self.cache_dir = '/base/path'
-
- p = pack.Pack('my_project', 'main')
- self.copy = pack.Copy(p)
- self.copy.src = '/some/source.txt'
- self.copy.src_type = pack.CopyType.FILE
-
- self.stub_sysroot_generator = sysroot_stub.StubSysrootGenerator()
- self.stub_os = stubs.StubOs()
- self.stub_open = stubs.StubOpen(self.stub_os)
- self.stub_glob = stubs.StubGlob(self.stub_os)
-
- image_build.glob = self.stub_glob
- image_build.os = self.stub_os
- image_build.open = self.stub_open.open
- image_build.sysroot = self.stub_sysroot_generator
-
- self.metadata_files = [
- self.stub_os.path.join(self.cache_dir, 'etc'),
- self.cache_dir,
- self.stub_os.path.sep + self.cache_dir.split(self.stub_os.path.sep)[1],
- self.stub_os.path.join(self.cache_dir, 'etc', 'fs_config_files'),
- self.stub_os.path.join(self.cache_dir, 'etc', 'fs_config_dirs'),
- self.stub_os.path.join(self.cache_dir, 'file_contexts'),
- ]
- # This always happens unless it fails early.
- self.stub_os.should_makedirs += [
- self.stub_os.path.join(self.cache_dir, 'etc')]
-
- def tearDown(self):
- # Make sure all files are copied.
- for gen in self.stub_sysroot_generator.sysroots:
- self.assertEqual(gen.should_add_file, [])
- self.assertEqual(gen.should_add_dir, [])
- self.assertEqual(gen.should_add_glob, [])
-
- def test_dependency_error(self):
- target = target_stub.StubTarget(board=self._BSP,
- submap_raises=dependency.Error('dep error'))
- with self.assertRaises(dependency.Error):
- image_build.CreateTargetCache(self.spec, target, self.cache_dir)
-
- def test_skip_os(self):
- p = pack.Pack('brillo.{}'.format(self._BRILLO_VERSION), 'some_os_stuff')
- simple_map = packmap_stub.StubPackMap(
- destinations={'/system/bin/servicemanager': [pack.Copy(p)]})
- target = target_stub.StubTarget(
- os='brillo', os_version=self._BRILLO_VERSION, board=self._BSP,
- submaps=[simple_map])
- # sysroot stub will not be touched because the OS will be under /system.
- image_build.CreateTargetCache(self.spec, target, self.cache_dir,
- mountpoint='/odm')
-
- def test_with_os(self):
- p = pack.Pack('brillo.{}'.format(self._BRILLO_VERSION), 'some_os_stuff')
- cpy = pack.Copy(p,
- dst='/system/bin/servicemanager', dst_type=pack.CopyType.FILE,
- src='/tmp/a_file', src_type=pack.CopyType.FILE)
- cpy.override_build = False
- simple_map = packmap_stub.StubPackMap(
- destinations={cpy.dst: [cpy]})
- target = target_stub.StubTarget(
- os='brillo', os_version=self._BRILLO_VERSION, board=self._BSP,
- submaps=[simple_map])
- self.stub_sysroot_generator.should_makedirs = [
- self.stub_os.path.dirname(cpy.dst)[1:]]
- self.stub_sysroot_generator.should_add_file = [(cpy.src,
- cpy.dst.lstrip('/'))]
- image_build.CreateTargetCache(self.spec, target, self.cache_dir,
- mountpoint='/')
-
- def test_skip_nonmountpoint(self):
- self.copy.dst = '/system/bin/xyzzy.txt'
- self.copy.dst_type = pack.CopyType.FILE
- odm_copy = pack.Copy(self.copy.pack)
- odm_copy.dst = '/odm/bin/xyzzy.txt'
- odm_copy.dst_type = pack.CopyType.FILE
- odm_copy.src = self.copy.src
- odm_copy.src_type = self.copy.src_type
- simple_map = packmap_stub.StubPackMap(
- destinations={self.copy.dst: [self.copy], odm_copy.dst: [odm_copy]})
- target = target_stub.StubTarget(submaps=[simple_map])
- self.stub_sysroot_generator.should_makedirs = [
- self.stub_os.path.dirname(odm_copy.dst)[1:]]
- self.stub_sysroot_generator.should_add_file = [(odm_copy.src,
- 'odm/bin/xyzzy.txt')]
- image_build.CreateTargetCache(self.spec, target, self.cache_dir,
- mountpoint='/odm')
-
- def test_copy_file_src_file_dst(self):
- self.copy.dst = '/system/bin/xyzzy.txt'
- self.copy.dst_type = pack.CopyType.FILE
- self.copy.acl.selabel = 'u:object_r:system_foo:s0'
- simple_map = packmap_stub.StubPackMap(
- destinations={self.copy.dst: [self.copy]})
- target = target_stub.StubTarget(submaps=[simple_map])
- self.stub_sysroot_generator.should_makedirs = [
- self.stub_os.path.dirname(self.copy.dst)[1:]]
- self.stub_sysroot_generator.should_add_file = [(self.copy.src,
- 'system/bin/xyzzy.txt')]
- image_build.CreateTargetCache(self.spec, target, self.cache_dir)
- # Ensure the selabel was copied.
- file_contexts = self.stub_open.files[
- self.stub_os.path.join(self.cache_dir, 'file_contexts')].contents
- self.assertRegexpMatches('\n'.join(file_contexts),
- '{}($|\n)'.format(self.copy.acl.file_context()))
- # Ensure the ACLs were copied too.
- fs_config_files = self.stub_open.files[
- self.stub_os.path.join(self.cache_dir, 'etc',
- 'fs_config_files')].contents
- self.assertRegexpMatches(
- '\n'.join(fs_config_files),
- '{}($|\n)'.format(self.copy.acl.fs_config(binary=True)))
-
- def test_copy_system_with_custom_deep_file(self):
- # When copying a "system" pack, we rely on the compiled in fs_config_*
- # so let's make sure we don't override it.
-
- # A custom file.
- self.copy.dst = '/system/bin/xyzzy.txt'
- self.copy.dst_type = pack.CopyType.FILE
- self.copy.acl.user = 1234
- self.copy.acl.group = 4321
-
- p = pack.Pack('brillo.{}'.format(self._BRILLO_VERSION), 'some_os_stuff')
- cpy = pack.Copy(p,
- dst='/system/bin/servicemanager', dst_type=pack.CopyType.FILE,
- src='/tmp/a_file', src_type=pack.CopyType.FILE)
- cpy.acl.override_build = False
- simple_map = packmap_stub.StubPackMap(
- destinations={cpy.dst: [cpy],
- self.copy.dst: [self.copy]})
- target = target_stub.StubTarget(
- os='brillo', os_version=self._BRILLO_VERSION, board=self._BSP,
- submaps=[simple_map])
- self.stub_sysroot_generator.should_makedirs = [
- self.stub_os.path.dirname(self.copy.dst)[1:],
- self.stub_os.path.dirname(cpy.dst)[1:]]
- self.stub_sysroot_generator.should_add_file = [(cpy.src,
- cpy.dst.lstrip('/')),
- (self.copy.src,
- 'system/bin/xyzzy.txt')]
-
- image_build.CreateTargetCache(self.spec, target, self.cache_dir,
- mountpoint='/')
-
- # Ensure the ACLs were copied.
- fs_config_files = self.stub_open.files[
- self.stub_os.path.join(self.cache_dir, 'etc',
- 'fs_config_files')].contents
- self.assertEqual(
- ['%\x00\x00\x01\xd2\x04\xe1\x10\x00\x00\x00\x00\x00\x00\x00\x00system/bin/xyzzy.txt\x00'],
- fs_config_files)
- # Ensure that no directory ACLs were implicitly placed on
- # system or system/bin/.
- fs_config_dirs = self.stub_open.files[
- self.stub_os.path.join(self.cache_dir, 'etc',
- 'fs_config_dirs')].contents
- self.assertEqual(fs_config_dirs, [''])
-
- def test_copy_glob(self):
- self.copy.src = '/my/bins/*'
- self.copy.src_type = pack.CopyType.GLOB
- self.copy.recurse = False
- self.copy.dst = '/system/bin/'
- self.copy.dst_type = pack.CopyType.DIR
- simple_map = packmap_stub.StubPackMap(
- destinations={self.copy.dst: [self.copy]})
- target = target_stub.StubTarget(submaps=[simple_map])
- self.stub_sysroot_generator.should_makedirs = [
- self.stub_os.path.dirname(self.copy.dst)[1:]]
- self.stub_sysroot_generator.should_add_glob = [
- (self.copy.src, 'system/bin/', False)]
- image_build.CreateTargetCache(self.spec, target, self.cache_dir)
-
- def test_copy_glob_recursive(self):
- self.copy.src = '/my/bins/*'
- self.copy.src_type = pack.CopyType.GLOB
- self.copy.recurse = True
- self.copy.dst = '/system/bin/'
- self.copy.dst_type = pack.CopyType.DIR
- simple_map = packmap_stub.StubPackMap(
- destinations={self.copy.dst: [self.copy]})
- target = target_stub.StubTarget(submaps=[simple_map])
- self.stub_sysroot_generator.should_makedirs = [
- self.stub_os.path.dirname(self.copy.dst)[1:]]
- self.stub_sysroot_generator.should_add_glob = [
- (self.copy.src, 'system/bin/', True)]
- image_build.CreateTargetCache(self.spec, target, self.cache_dir)
-
- def test_copy_dir(self):
- self.copy.src = '/my/etc/'
- self.copy.src_type = pack.CopyType.DIR
- self.copy.recurse = False
- self.copy.dst = '/system/etc/'
- self.copy.dst_type = pack.CopyType.DIR
- simple_map = packmap_stub.StubPackMap(
- destinations={self.copy.dst: [self.copy]})
- target = target_stub.StubTarget(submaps=[simple_map])
- self.stub_sysroot_generator.should_makedirs = [
- self.stub_os.path.dirname(self.copy.dst)[1:]]
- self.stub_sysroot_generator.should_add_dir = [
- (self.copy.src, 'system/etc/', False)]
- image_build.CreateTargetCache(self.spec, target, self.cache_dir)
-
- def test_copy_dir_recurse(self):
- self.copy.src = '/my/etc/'
- self.copy.src_type = pack.CopyType.DIR
- self.copy.recurse = True
- self.copy.dst = '/system/etc/'
- self.copy.dst_type = pack.CopyType.DIR
- simple_map = packmap_stub.StubPackMap(
- destinations={self.copy.dst: [self.copy]})
- target = target_stub.StubTarget(submaps=[simple_map])
- self.stub_sysroot_generator.should_makedirs = [
- self.stub_os.path.dirname(self.copy.dst)[1:]]
- self.stub_sysroot_generator.should_add_dir = [
- (self.copy.src, 'system/etc/', True)]
- image_build.CreateTargetCache(self.spec, target, self.cache_dir)
-
- def test_copy_invalid_combos(self):
- combos = [
- (pack.CopyType.DIR, pack.CopyType.FILE, False),
- (pack.CopyType.DIR, pack.CopyType.FILE, True),
- (pack.CopyType.FILE, pack.CopyType.GLOB, False),
- (pack.CopyType.FILE, pack.CopyType.GLOB, True),
- (pack.CopyType.GLOB, pack.CopyType.FILE, False),
- (pack.CopyType.GLOB, pack.CopyType.FILE, True),
- ]
- for tup in combos:
- self.copy.src_type = tup[0]
- self.copy.dst_type = tup[1]
- self.copy.dst = '/system/bin/xyzzy.txt'
- simple_map = packmap_stub.StubPackMap(
- destinations={self.copy.dst: [self.copy]})
- target = target_stub.StubTarget(submaps=[simple_map])
- with self.assertRaises(image_build.PathError):
+ def setUp(self):
+ self.spec = project_spec_stub.StubProjectSpec()
+ self.target = None
+ self.cache_dir = '/base/path'
+
+ p = pack.Pack('my_project', 'main')
+ self.copy = pack.Copy(p)
+ self.copy.src = '/some/source.txt'
+ self.copy.src_type = pack.CopyType.FILE
+
+ self.stub_sysroot_generator = sysroot_stub.StubSysrootGenerator()
+ self.stub_os = stubs.StubOs()
+ self.stub_open = stubs.StubOpen(self.stub_os)
+ self.stub_glob = stubs.StubGlob(self.stub_os)
+
+ image_build.glob = self.stub_glob
+ image_build.os = self.stub_os
+ image_build.open = self.stub_open.open
+ image_build.sysroot = self.stub_sysroot_generator
+
+ self.metadata_files = [
+ self.stub_os.path.join(self.cache_dir, 'etc'),
+ self.cache_dir,
+ self.stub_os.path.sep +
+ self.cache_dir.split(self.stub_os.path.sep)[1],
+ self.stub_os.path.join(self.cache_dir, 'etc', 'fs_config_files'),
+ self.stub_os.path.join(self.cache_dir, 'etc', 'fs_config_dirs'),
+ self.stub_os.path.join(self.cache_dir, 'file_contexts'),
+ ]
+ # This always happens unless it fails early.
+ self.stub_os.should_makedirs += [
+ self.stub_os.path.join(self.cache_dir, 'etc')]
+
+ def tearDown(self):
+ # Make sure all files are copied.
+ for gen in self.stub_sysroot_generator.sysroots:
+ self.assertEqual(gen.should_add_file, [])
+ self.assertEqual(gen.should_add_dir, [])
+ self.assertEqual(gen.should_add_glob, [])
+
+ def test_dependency_error(self):
+ target = target_stub.StubTarget(
+ board=self._BSP, submap_raises=dependency.Error('dep error'))
+ with self.assertRaises(dependency.Error):
+ image_build.CreateTargetCache(self.spec, target, self.cache_dir)
+
+ def test_skip_os(self):
+ p = pack.Pack('brillo.{}'.format(self._BRILLO_VERSION), 'some_os_stuff')
+ simple_map = packmap_stub.StubPackMap(
+ destinations={'/system/bin/servicemanager': [pack.Copy(p)]})
+ target = target_stub.StubTarget(
+ os='brillo', os_version=self._BRILLO_VERSION, board=self._BSP,
+ submaps=[simple_map])
+ # sysroot stub will not be touched because the OS will be under /system.
+ image_build.CreateTargetCache(self.spec, target, self.cache_dir,
+ mountpoint='/odm')
+
+ def test_with_os(self):
+ p = pack.Pack('brillo.{}'.format(self._BRILLO_VERSION), 'some_os_stuff')
+ cpy = pack.Copy(p, dst='/system/bin/servicemanager',
+ dst_type=pack.CopyType.FILE, src='/tmp/a_file',
+ src_type=pack.CopyType.FILE)
+ cpy.override_build = False
+ simple_map = packmap_stub.StubPackMap(
+ destinations={cpy.dst: [cpy]})
+ target = target_stub.StubTarget(
+ os='brillo', os_version=self._BRILLO_VERSION, board=self._BSP,
+ submaps=[simple_map])
+ self.stub_sysroot_generator.should_makedirs = [
+ self.stub_os.path.dirname(cpy.dst)[1:]]
+ self.stub_sysroot_generator.should_add_file = [(cpy.src,
+ cpy.dst.lstrip('/'))]
+ image_build.CreateTargetCache(self.spec, target, self.cache_dir,
+ mountpoint='/')
+
+ def test_skip_nonmountpoint(self):
+ self.copy.dst = '/system/bin/xyzzy.txt'
+ self.copy.dst_type = pack.CopyType.FILE
+ odm_copy = pack.Copy(self.copy.pack)
+ odm_copy.dst = '/odm/bin/xyzzy.txt'
+ odm_copy.dst_type = pack.CopyType.FILE
+ odm_copy.src = self.copy.src
+ odm_copy.src_type = self.copy.src_type
+ simple_map = packmap_stub.StubPackMap(
+ destinations={self.copy.dst: [self.copy], odm_copy.dst: [odm_copy]})
+ target = target_stub.StubTarget(submaps=[simple_map])
+ self.stub_sysroot_generator.should_makedirs = [
+ self.stub_os.path.dirname(odm_copy.dst)[1:]]
+ self.stub_sysroot_generator.should_add_file = [(odm_copy.src,
+ 'odm/bin/xyzzy.txt')]
+ image_build.CreateTargetCache(self.spec, target, self.cache_dir,
+ mountpoint='/odm')
+
+ def test_copy_file_src_file_dst(self):
+ self.copy.dst = '/system/bin/xyzzy.txt'
+ self.copy.dst_type = pack.CopyType.FILE
+ self.copy.acl.selabel = 'u:object_r:system_foo:s0'
+ simple_map = packmap_stub.StubPackMap(
+ destinations={self.copy.dst: [self.copy]})
+ target = target_stub.StubTarget(submaps=[simple_map])
+ self.stub_sysroot_generator.should_makedirs = [
+ self.stub_os.path.dirname(self.copy.dst)[1:]]
+ self.stub_sysroot_generator.should_add_file = [(self.copy.src,
+ 'system/bin/xyzzy.txt')]
+ image_build.CreateTargetCache(self.spec, target, self.cache_dir)
+ # Ensure the selabel was copied.
+ file_contexts = self.stub_open.files[
+ self.stub_os.path.join(self.cache_dir, 'file_contexts')].contents
+ self.assertRegexpMatches(
+ '\n'.join(file_contexts),
+ '{}($|\n)'.format(self.copy.acl.file_context()))
+ # Ensure the ACLs were copied too.
+ fs_config_files = self.stub_open.files[
+ self.stub_os.path.join(self.cache_dir, 'etc',
+ 'fs_config_files')].contents
+ self.assertRegexpMatches(
+ '\n'.join(fs_config_files),
+ '{}($|\n)'.format(self.copy.acl.fs_config(binary=True)))
+
+ def test_copy_system_with_custom_deep_file(self):
+ # When copying a "system" pack, we rely on the compiled in fs_config_*
+ # so let's make sure we don't override it.
+
+ # A custom file.
+ self.copy.dst = '/system/bin/xyzzy.txt'
+ self.copy.dst_type = pack.CopyType.FILE
+ self.copy.acl.user = 1234
+ self.copy.acl.group = 4321
+
+ p = pack.Pack('brillo.{}'.format(self._BRILLO_VERSION), 'some_os_stuff')
+ cpy = pack.Copy(p, dst='/system/bin/servicemanager',
+ dst_type=pack.CopyType.FILE, src='/tmp/a_file',
+ src_type=pack.CopyType.FILE)
+ cpy.acl.override_build = False
+ simple_map = packmap_stub.StubPackMap(
+ destinations={cpy.dst: [cpy],
+ self.copy.dst: [self.copy]})
+ target = target_stub.StubTarget(
+ os='brillo', os_version=self._BRILLO_VERSION, board=self._BSP,
+ submaps=[simple_map])
+ self.stub_sysroot_generator.should_makedirs = [
+ self.stub_os.path.dirname(self.copy.dst)[1:],
+ self.stub_os.path.dirname(cpy.dst)[1:]]
+ self.stub_sysroot_generator.should_add_file = [(cpy.src,
+ cpy.dst.lstrip('/')),
+ (self.copy.src,
+ 'system/bin/xyzzy.txt')]
+
+ image_build.CreateTargetCache(self.spec, target, self.cache_dir,
+ mountpoint='/')
+
+ # Ensure the ACLs were copied.
+ fs_config_files = self.stub_open.files[
+ self.stub_os.path.join(self.cache_dir, 'etc',
+ 'fs_config_files')].contents
+ self.assertEqual(
+ ['%\x00\x00\x01\xd2\x04\xe1\x10\x00\x00\x00\x00\x00\x00\x00\x00'
+ 'system/bin/xyzzy.txt\x00'],
+ fs_config_files)
+ # Ensure that no directory ACLs were implicitly placed on
+ # system or system/bin/.
+ fs_config_dirs = self.stub_open.files[
+ self.stub_os.path.join(self.cache_dir, 'etc',
+ 'fs_config_dirs')].contents
+ self.assertEqual(fs_config_dirs, [''])
+
+ def test_copy_glob(self):
+ self.copy.src = '/my/bins/*'
+ self.copy.src_type = pack.CopyType.GLOB
+ self.copy.recurse = False
+ self.copy.dst = '/system/bin/'
+ self.copy.dst_type = pack.CopyType.DIR
+ simple_map = packmap_stub.StubPackMap(
+ destinations={self.copy.dst: [self.copy]})
+ target = target_stub.StubTarget(submaps=[simple_map])
+ self.stub_sysroot_generator.should_makedirs = [
+ self.stub_os.path.dirname(self.copy.dst)[1:]]
+ self.stub_sysroot_generator.should_add_glob = [
+ (self.copy.src, 'system/bin/', False)]
+ image_build.CreateTargetCache(self.spec, target, self.cache_dir)
+
+ def test_copy_glob_recursive(self):
+ self.copy.src = '/my/bins/*'
+ self.copy.src_type = pack.CopyType.GLOB
+ self.copy.recurse = True
+ self.copy.dst = '/system/bin/'
+ self.copy.dst_type = pack.CopyType.DIR
+ simple_map = packmap_stub.StubPackMap(
+ destinations={self.copy.dst: [self.copy]})
+ target = target_stub.StubTarget(submaps=[simple_map])
+ self.stub_sysroot_generator.should_makedirs = [
+ self.stub_os.path.dirname(self.copy.dst)[1:]]
+ self.stub_sysroot_generator.should_add_glob = [
+ (self.copy.src, 'system/bin/', True)]
+ image_build.CreateTargetCache(self.spec, target, self.cache_dir)
+
+ def test_copy_dir(self):
+ self.copy.src = '/my/etc/'
+ self.copy.src_type = pack.CopyType.DIR
+ self.copy.recurse = False
+ self.copy.dst = '/system/etc/'
+ self.copy.dst_type = pack.CopyType.DIR
+ simple_map = packmap_stub.StubPackMap(
+ destinations={self.copy.dst: [self.copy]})
+ target = target_stub.StubTarget(submaps=[simple_map])
+ self.stub_sysroot_generator.should_makedirs = [
+ self.stub_os.path.dirname(self.copy.dst)[1:]]
+ self.stub_sysroot_generator.should_add_dir = [
+ (self.copy.src, 'system/etc/', False)]
+ image_build.CreateTargetCache(self.spec, target, self.cache_dir)
+
+ def test_copy_dir_recurse(self):
+ self.copy.src = '/my/etc/'
+ self.copy.src_type = pack.CopyType.DIR
+ self.copy.recurse = True
+ self.copy.dst = '/system/etc/'
+ self.copy.dst_type = pack.CopyType.DIR
+ simple_map = packmap_stub.StubPackMap(
+ destinations={self.copy.dst: [self.copy]})
+ target = target_stub.StubTarget(submaps=[simple_map])
+ self.stub_sysroot_generator.should_makedirs = [
+ self.stub_os.path.dirname(self.copy.dst)[1:]]
+ self.stub_sysroot_generator.should_add_dir = [
+ (self.copy.src, 'system/etc/', True)]
image_build.CreateTargetCache(self.spec, target, self.cache_dir)
- def test_unmanaged_file(self):
- self.copy.dst = '/system/bin/xyzzy.txt'
- self.copy.dst_type = pack.CopyType.FILE
- simple_map = packmap_stub.StubPackMap(
- destinations={self.copy.dst: [self.copy]})
- target = target_stub.StubTarget(submaps=[simple_map])
- self.stub_sysroot_generator.should_makedirs = [
- self.stub_os.path.dirname(self.copy.dst)[1:]]
- self.stub_sysroot_generator.should_add_file = [(self.copy.src,
- 'system/bin/xyzzy.txt')]
- self.stub_os.path.should_exist.append('/base/path/root/system/foozle/blag')
- image_build.CreateTargetCache(self.spec, target, self.cache_dir,
- verbose=False)
- # The metadata files should've been created.
- expect_files = self.metadata_files
- self.assertEqual(self.stub_os.path.should_exist, expect_files)
-
- def test_unmanaged_dir_dir_file(self):
- self.copy.dst = '/system/bin/xyzzy.txt'
- self.copy.dst_type = pack.CopyType.FILE
- simple_map = packmap_stub.StubPackMap(
- destinations={self.copy.dst: [self.copy]})
- target = target_stub.StubTarget(submaps=[simple_map])
- self.stub_sysroot_generator.should_makedirs = [
- self.stub_os.path.dirname(self.copy.dst)[1:]]
- self.stub_sysroot_generator.should_add_file = [(self.copy.src,
- 'system/bin/xyzzy.txt')]
- self.stub_os.path.should_exist.append(
- '/base/path/root/system/foozle/barzle/blag.mal')
- image_build.CreateTargetCache(self.spec, target, self.cache_dir,
- verbose=False)
- expect_files = self.metadata_files
- self.assertEqual(self.stub_os.path.should_exist, expect_files)
+ def test_copy_invalid_combos(self):
+ combos = [
+ (pack.CopyType.DIR, pack.CopyType.FILE, False),
+ (pack.CopyType.DIR, pack.CopyType.FILE, True),
+ (pack.CopyType.FILE, pack.CopyType.GLOB, False),
+ (pack.CopyType.FILE, pack.CopyType.GLOB, True),
+ (pack.CopyType.GLOB, pack.CopyType.FILE, False),
+ (pack.CopyType.GLOB, pack.CopyType.FILE, True),
+ ]
+ for tup in combos:
+ self.copy.src_type = tup[0]
+ self.copy.dst_type = tup[1]
+ self.copy.dst = '/system/bin/xyzzy.txt'
+ simple_map = packmap_stub.StubPackMap(
+ destinations={self.copy.dst: [self.copy]})
+ target = target_stub.StubTarget(submaps=[simple_map])
+ with self.assertRaises(image_build.PathError):
+ image_build.CreateTargetCache(self.spec, target, self.cache_dir)
+
+ def test_unmanaged_file(self):
+ self.copy.dst = '/system/bin/xyzzy.txt'
+ self.copy.dst_type = pack.CopyType.FILE
+ simple_map = packmap_stub.StubPackMap(
+ destinations={self.copy.dst: [self.copy]})
+ target = target_stub.StubTarget(submaps=[simple_map])
+ self.stub_sysroot_generator.should_makedirs = [
+ self.stub_os.path.dirname(self.copy.dst)[1:]]
+ self.stub_sysroot_generator.should_add_file = [(self.copy.src,
+ 'system/bin/xyzzy.txt')]
+ self.stub_os.path.should_exist.append(
+ '/base/path/root/system/foozle/blag')
+ image_build.CreateTargetCache(self.spec, target, self.cache_dir,
+ verbose=False)
+ # The metadata files should've been created.
+ expect_files = self.metadata_files
+ self.assertEqual(self.stub_os.path.should_exist, expect_files)
+
+ def test_unmanaged_dir_dir_file(self):
+ self.copy.dst = '/system/bin/xyzzy.txt'
+ self.copy.dst_type = pack.CopyType.FILE
+ simple_map = packmap_stub.StubPackMap(
+ destinations={self.copy.dst: [self.copy]})
+ target = target_stub.StubTarget(submaps=[simple_map])
+ self.stub_sysroot_generator.should_makedirs = [
+ self.stub_os.path.dirname(self.copy.dst)[1:]]
+ self.stub_sysroot_generator.should_add_file = [(self.copy.src,
+ 'system/bin/xyzzy.txt')]
+ self.stub_os.path.should_exist.append(
+ '/base/path/root/system/foozle/barzle/blag.mal')
+ image_build.CreateTargetCache(self.spec, target, self.cache_dir,
+ verbose=False)
+ expect_files = self.metadata_files
+ self.assertEqual(self.stub_os.path.should_exist, expect_files)
diff --git a/cli/lib/core/popen.py b/cli/lib/core/popen.py
index 785e1a4..d71fecf 100644
--- a/cli/lib/core/popen.py
+++ b/cli/lib/core/popen.py
@@ -21,7 +21,7 @@ import subprocess
def PopenPiped(args, **kwargs):
- """A wrapper around Popen with stdout/stderr pipes."""
- return subprocess.Popen(args,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE, **kwargs)
+ """A wrapper around Popen with stdout/stderr pipes."""
+ return subprocess.Popen(args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, **kwargs)
diff --git a/cli/lib/core/product.py b/cli/lib/core/product.py
index e50df49..7561d59 100644
--- a/cli/lib/core/product.py
+++ b/cli/lib/core/product.py
@@ -25,137 +25,142 @@ from core import util
class ProductCreatorError(Exception):
- pass
+ pass
class ProductCreator(object):
- def __init__(self, name, vendor, device):
- """Initializes the ProductCreators internal properties"""
- self._name = name
- self._config = None
- self._device = device
- self._vendor = vendor
-
- def exists(self):
- """Returns true if a subdirectory with the product name exists"""
- try:
- return os.path.exists(self._name)
- except OSError:
- return False
-
- def create(self):
- """Creates a subdirectory for the product and creates templates"""
- self._create_empty()
-
- # Populate the envsetup.sh file.
- env = Environment(self._config, self._name)
- env.envsetup()
-
- def _init_config(self):
- # Populate the configuration store
- self._config = config.ProductFileStore(os.path.abspath(self._name))
- self._config.name = self._name
- self._config.brand = 'Brillo'
- self._config.manufacturer = self._vendor
- self._config.manufacturer = self._vendor
- self._config.device = self._device
- self._config.bdk.buildtype = 'userdebug'
- self._config.bdk.version = util.GetBDKVersion()
- self._config.copy_files = """
+ def __init__(self, name, vendor, device):
+ """Initializes the ProductCreators internal properties"""
+ self._name = name
+ self._config = None
+ self._device = device
+ self._vendor = vendor
+
+ def exists(self):
+ """Returns true if a subdirectory with the product name exists"""
+ try:
+ return os.path.exists(self._name)
+ except OSError:
+ return False
+
+ def create(self):
+ """Creates a subdirectory for the product and creates templates"""
+ self._create_empty()
+
+ # Populate the envsetup.sh file.
+ env = Environment(self._config, self._name)
+ env.envsetup()
+
+ def _init_config(self):
+ # Populate the configuration store
+ self._config = config.ProductFileStore(os.path.abspath(self._name))
+ self._config.name = self._name
+ self._config.brand = 'Brillo'
+ self._config.manufacturer = self._vendor
+ self._config.manufacturer = self._vendor
+ self._config.device = self._device
+ self._config.bdk.buildtype = 'userdebug'
+ self._config.bdk.version = util.GetBDKVersion()
+ self._config.copy_files = """
# Format:
# path-in-product-dir:path-to-install-in-device
# E.g., weaved.conf:system/etc/weaved/weaved.conf
- """
- self._config.brillo.product_id = (
- 'developer-boards:brillo-starter-board-<REPLACE ME>')
- self._config.brillo.crash_server = 'https://clients2.google.com/bc/report'
- # Building without Java isn't completely supported yet.
- # b/25281898
- self._config.bdk.java = '1'
-
- def _create_common(self):
- paths = [self._name, os.path.join(self._name, 'out'),
- os.path.join(self._name, 'sepolicy'),
- os.path.join(self._name, 'src')]
- for path in paths:
- if not os.path.exists(path):
- os.makedirs(path)
- self._init_config()
-
- with open(os.path.join(self._name, 'AndroidProducts.mk'), 'w') as f:
- f.write(product_templates.ANDROIDPRODUCTS_MK.substitute(
- self._config.dict()))
-
- def _create_empty(self):
- """Creates a subdirectory for the product and creates templates"""
- self._create_common()
- with open(os.path.join(self._name, ('%s.mk' % self._name)), 'w') as f:
- f.write(product_templates.PRODUCT_MK.substitute(self._config.dict()))
-
- # SELinux templates.
- sepolicy_dir = os.path.join(self._name, 'sepolicy')
- service_te_file = os.path.join(sepolicy_dir, '%s_service.te' % self._name)
-
- with open(service_te_file, 'w') as f:
- f.write(product_templates.SELINUX_DOMAIN.substitute(self._config.dict()))
-
- with open(os.path.join(sepolicy_dir, 'file_contexts'), 'w') as f:
- f.write(product_templates.SELINUX_FILE_CONTEXTS.substitute(
- self._config.dict()))
+ """
+ self._config.brillo.product_id = (
+ 'developer-boards:brillo-starter-board-<REPLACE ME>')
+ self._config.brillo.crash_server = (
+ 'https://clients2.google.com/bc/report')
+ # Building without Java isn't completely supported yet.
+ # b/25281898
+ self._config.bdk.java = '1'
+
+ def _create_common(self):
+ paths = [self._name, os.path.join(self._name, 'out'),
+ os.path.join(self._name, 'sepolicy'),
+ os.path.join(self._name, 'src')]
+ for path in paths:
+ if not os.path.exists(path):
+ os.makedirs(path)
+ self._init_config()
+
+ with open(os.path.join(self._name, 'AndroidProducts.mk'), 'w') as f:
+ f.write(product_templates.ANDROIDPRODUCTS_MK.substitute(
+ self._config.dict()))
+
+ def _create_empty(self):
+ """Creates a subdirectory for the product and creates templates"""
+ self._create_common()
+ with open(os.path.join(self._name, ('%s.mk' % self._name)), 'w') as f:
+ f.write(
+ product_templates.PRODUCT_MK.substitute(self._config.dict()))
+
+ # SELinux templates.
+ sepolicy_dir = os.path.join(self._name, 'sepolicy')
+ service_te_file = os.path.join(sepolicy_dir,
+ '%s_service.te' % self._name)
+
+ with open(service_te_file, 'w') as f:
+ f.write(
+ product_templates.SELINUX_DOMAIN.substitute(
+ self._config.dict()))
+
+ with open(os.path.join(sepolicy_dir, 'file_contexts'), 'w') as f:
+ f.write(product_templates.SELINUX_FILE_CONTEXTS.substitute(
+ self._config.dict()))
class Environment(object):
- """This class is used to generate a shell environment meant to streamline
- the use of the BDK while keeping it familiar to Android device developers.
-
- Note: None of the templates other than ENVSETUP should contain single
- quotes ('). If they do, it will break during sourcing of envsetup.sh.
- """
-
- def __init__(self, config_, product_dir=util.GetProductDir()):
- self._config = config_
- self._product_dir = product_dir
-
- def banner(self):
- """Return the banner to be printed after envsetup.sh is sourced"""
- bdk_path = util.GetBDKPath()
- bdk_version = util.GetBDKVersion()
- # TODO(b/25952629) Add support for a BDK warning if an update is needed.
- bdk_warning = ''
- return product_templates.BDK_BANNER.substitute(
- self._config.dict(),
- bdk_version=bdk_version,
- bdk_path=bdk_path,
- bdk_warning=bdk_warning)
-
- def exports(self):
- """Export shell environment variables for calling dev tools"""
- bdk_path = util.DEPRECATED_GetDefaultOSPath()
- return product_templates.ENV_EXPORTS.substitute(
- self._config.dict(),
- bdk_path=bdk_path,
- product_path=self._product_dir,
- cli_path=util.GetBDKPath('cli'),
- target_out=os.path.join(self._product_dir, 'out',
- 'out-%s' % (self._config.device),
- 'target', 'product', self._config.device)
- )
-
- def aliases(self):
- """Wrap bdk commands with shell functions"""
- return product_templates.ENV_ALIASES.substitute(
- self._config.dict(), product_path=self._product_dir)
-
- def environment(self):
- """Returns a complete shell environment for product development"""
- return self.banner() + self.exports() + self.aliases()
-
- def envsetup(self):
- """Generates a file which will run bdk product envsetup"""
- bdk_path = util.DEPRECATED_GetDefaultOSPath()
- tools_path = util.GetBDKPath()
- with open(os.path.join(self._product_dir, 'envsetup.sh'), 'w') as f:
- f.write(product_templates.ENVSETUP.substitute(
- self._config.dict(), bdk_path=bdk_path,
- product_path=self._product_dir,
- tools_path=tools_path))
+ """This class is used to generate a shell environment meant to streamline
+ the use of the BDK while keeping it familiar to Android device developers.
+
+ Note: None of the templates other than ENVSETUP should contain single
+ quotes ('). If they do, it will break during sourcing of envsetup.sh.
+ """
+
+ def __init__(self, config_, product_dir=util.GetProductDir()):
+ self._config = config_
+ self._product_dir = product_dir
+
+ def banner(self):
+ """Return the banner to be printed after envsetup.sh is sourced."""
+ bdk_path = util.GetBDKPath()
+ bdk_version = util.GetBDKVersion()
+ # TODO(b/25952629) Add support for a BDK warning if an update is needed.
+ bdk_warning = ''
+ return product_templates.BDK_BANNER.substitute(
+ self._config.dict(),
+ bdk_version=bdk_version,
+ bdk_path=bdk_path,
+ bdk_warning=bdk_warning)
+
+ def exports(self):
+ """Export shell environment variables for calling dev tools"""
+ bdk_path = util.DEPRECATED_GetDefaultOSPath()
+ return product_templates.ENV_EXPORTS.substitute(
+ self._config.dict(),
+ bdk_path=bdk_path,
+ product_path=self._product_dir,
+ cli_path=util.GetBDKPath('cli'),
+ target_out=os.path.join(self._product_dir, 'out',
+ 'out-%s' % (self._config.device),
+ 'target', 'product', self._config.device)
+ )
+
+ def aliases(self):
+ """Wrap bdk commands with shell functions"""
+ return product_templates.ENV_ALIASES.substitute(
+ self._config.dict(), product_path=self._product_dir)
+
+ def environment(self):
+ """Returns a complete shell environment for product development"""
+ return self.banner() + self.exports() + self.aliases()
+
+ def envsetup(self):
+ """Generates a file which will run bdk product envsetup"""
+ bdk_path = util.DEPRECATED_GetDefaultOSPath()
+ tools_path = util.GetBDKPath()
+ with open(os.path.join(self._product_dir, 'envsetup.sh'), 'w') as f:
+ f.write(product_templates.ENVSETUP.substitute(
+ self._config.dict(), bdk_path=bdk_path,
+ product_path=self._product_dir,
+ tools_path=tools_path))
diff --git a/cli/lib/core/properties.py b/cli/lib/core/properties.py
index 2d14508..cc8793f 100644
--- a/cli/lib/core/properties.py
+++ b/cli/lib/core/properties.py
@@ -21,249 +21,253 @@ import string
class PropEntry(object):
- """Leaf node in the property tree representing the final value."""
- def __init__(self, name, full_key, legal_values, setter, getter):
- self._name = name
- self._full_key = full_key
- self._legal_values = legal_values
- self._getter = getter
- self._setter = setter
+ """Leaf node in the property tree representing the final value."""
+ def __init__(self, name, full_key, legal_values, setter, getter):
+ self._name = name
+ self._full_key = full_key
+ self._legal_values = legal_values
+ self._getter = getter
+ self._setter = setter
- def name(self):
- return self._name
+ def name(self):
+ return self._name
- def get(self):
- return self._getter(self._full_key)
+ def get(self):
+ return self._getter(self._full_key)
- def set(self, val):
- if self._legal_values and val not in self._legal_values:
- raise ValueError('Cannot set {} to {} (legal values are {}).'.format(
- self._full_key, val, self._legal_values))
- return self._setter(self._full_key, val)
+ def set(self, val):
+ if self._legal_values and val not in self._legal_values:
+ raise ValueError(
+ 'Cannot set {} to {} (legal values are {}).'.format(
+ self._full_key, val, self._legal_values))
+ return self._setter(self._full_key, val)
- def __repr__(self):
- return 'PropEntry(%s)' % self._name
+ def __repr__(self):
+ return 'PropEntry(%s)' % self._name
- def __str__(self):
- return 'PropEntry(name=%s)' % self._name
+ def __str__(self):
+ return 'PropEntry(name=%s)' % self._name
class PropGroup(object):
- """Group of groups or entries used by PropBase"""
- def __init__(self, name, parent):
- self._name = name
- self._parent = parent
- self._children = []
-
- def __setattr__(self, key, val):
- if key.startswith('_'):
- super(PropGroup, self).__setattr__(key, val)
- else:
- if self.has_child(key):
- c = self.child(key)
- if type(c) == PropEntry:
- return c.set(val)
- p = self._parent
- prefix = self.name()
- while type(p) == PropGroup:
- if p.name() != 'root':
- prefix = string.join([p.name(), prefix], '.')
- p = p.parent()
- raise AttributeError(
- "AttributeError: '{}' object has no attribute '{}'".format(
- type(p), string.join([prefix, key], '.')))
-
- def __getattr__(self, key):
- if key.startswith('_'):
- return super(PropGroup, self).__getattr__(key)
- entry = string.split(key, '/')
- if self.has_child(entry[0]):
- c = self.child(entry[0])
- if type(c) == PropEntry:
- return c.get()
- return c
- p = self._parent
- prefix = self._name
- while type(p) == PropGroup:
- if p.name() != 'root':
- prefix = string.join([p.name(), prefix], '.')
- p = p.parent()
- raise AttributeError(
- "AttributeError: '{}' object has no attribute '{}'".format(
- type(p), string.join([prefix, key], '.')))
-
- def name(self):
- return self._name
-
- def parent(self):
- return self._parent
-
- def children(self):
- return self._children
-
- def add_child(self, child):
- self._children.append(child)
-
- def child(self, child_name):
- for c in self._children:
- if c.name() == child_name:
- return c
- raise KeyError, child_name
-
- def has_child(self, child_name):
- for c in self._children:
- if c.name() == child_name:
- return True
- return False
+ """Group of groups or entries used by PropBase"""
+ def __init__(self, name, parent):
+ self._name = name
+ self._parent = parent
+ self._children = []
+
+ def __setattr__(self, key, val):
+ if key.startswith('_'):
+ super(PropGroup, self).__setattr__(key, val)
+ else:
+ if self.has_child(key):
+ c = self.child(key)
+ if type(c) == PropEntry:
+ return c.set(val)
+ p = self._parent
+ prefix = self.name()
+ while type(p) == PropGroup:
+ if p.name() != 'root':
+ prefix = string.join([p.name(), prefix], '.')
+ p = p.parent()
+ raise AttributeError(
+ "AttributeError: '{}' object has no attribute '{}'".format(
+ type(p), string.join([prefix, key], '.')))
+
+ def __getattr__(self, key):
+ if key.startswith('_'):
+ return super(PropGroup, self).__getattr__(key)
+ entry = string.split(key, '/')
+ if self.has_child(entry[0]):
+ c = self.child(entry[0])
+ if type(c) == PropEntry:
+ return c.get()
+ return c
+ p = self._parent
+ prefix = self._name
+ while type(p) == PropGroup:
+ if p.name() != 'root':
+ prefix = string.join([p.name(), prefix], '.')
+ p = p.parent()
+ raise AttributeError(
+ "AttributeError: '{}' object has no attribute '{}'".format(
+ type(p), string.join([prefix, key], '.')))
+
+ def name(self):
+ return self._name
+
+ def parent(self):
+ return self._parent
+
+ def children(self):
+ return self._children
+
+ def add_child(self, child):
+ self._children.append(child)
+
+ def child(self, child_name):
+ for c in self._children:
+ if c.name() == child_name:
+ return c
+ raise KeyError, child_name
+
+ def has_child(self, child_name):
+ for c in self._children:
+ if c.name() == child_name:
+ return True
+ return False
- def __repr__(self):
- parent = None
- if type(self._parent) == PropGroup:
- parent = self._parent.name()
- return "{{ name: '{}', parent: '{}', children: {} }}".format(
- self._name, parent, self._children)
+ def __repr__(self):
+ parent = None
+ if type(self._parent) == PropGroup:
+ parent = self._parent.name()
+ return "{{ name: '{}', parent: '{}', children: {} }}".format(
+ self._name, parent, self._children)
- def __str__(self):
- return 'PropGroup(name="%s")' % self._name
+ def __str__(self):
+ return 'PropGroup(name="%s")' % self._name
# pylint: disable=abstract-class-not-used
class PropBase(object):
- """PropBase - makes a list of path-like properties into attr interface
+ """PropBase - makes a list of path-like properties into attr interface
- This class provides a base class for creating arbitrary tree structures of
- properties using a path-like definition. One can set DESIRED_PROPS and
- REQUIRED_PROPS, as well as enable or disable CACHING. Only two methods need
- to be overriden: _save, and _load. For instance,
+ This class provides a base class for creating arbitrary tree structures of
+ properties using a path-like definition. One can set DESIRED_PROPS and
+ REQUIRED_PROPS, as well as enable or disable CACHING. Only two methods need
+ to be overriden: _save, and _load. For instance,
- class PropDict(PropBase):
+ class PropDict(PropBase):
REQUIRED_PROPS = { 'editor/config/vim': [],
'editor/config/emacs': [],
'current_editor' : ['emacs', 'vim']
}
CACHING = True
- def __init__(self):
- self._d = {}
- super(PropDict, self).__init__()
-
- def _save(self, key, value):
- self._d[key] = value
- def _load(self, key):
- if not self._d.has_key(key):
- return ''
- return self._d[key]
-
- Usage then follows:
- p = PropDict()
- p.editor.config.vim = '$HOME/.vimrc'
- p.current_editor = 'vim'
- print p.editor.config.emacs
-
- Note! No properties may start with '_'.
- """
-
- # Fields of the store that can be accessed as properties.
- # { 'field_name': [], # Contains anything
- # 'restricted_field' : ['1', '0'] } # Only '0' or '1'
- REQUIRED_PROPS = {}
- OPTIONAL_PROPS = {}
-
- CACHING = False
-
- def __init__(self):
- self._root = self.create_prop_tree()
- self._cache = {}
-
- def complete(self):
- """Returns true if all expected properties are set."""
- for p in self.REQUIRED_PROPS.keys():
- # We use getattr instead of _load so that results get cached.
- if getattr(self, p) is None:
- return False
- return True
+ def __init__(self):
+ self._d = {}
+ super(PropDict, self).__init__()
+
+ def _save(self, key, value):
+ self._d[key] = value
+ def _load(self, key):
+ if not self._d.has_key(key):
+ return ''
+ return self._d[key]
- @classmethod
- def properties(cls):
- """Returns a dict of valid item keys and lists of validation values.
- Keys must be bare, e.g., 'foo', or a valid relative path, e.g.,
- 'foo/bar/baz'. Any intermediate values cannot contain values.
+ Usage then follows:
+ p = PropDict()
+ p.editor.config.vim = '$HOME/.vimrc'
+ p.current_editor = 'vim'
+ print p.editor.config.emacs
+
+ Note! No properties may start with '_'.
"""
- d = cls.OPTIONAL_PROPS.copy()
- d.update(cls.REQUIRED_PROPS)
- return d
- def _load(self, key):
- """Returns the correct value for the given |key|."""
- raise NotImplementedError
+ # Fields of the store that can be accessed as properties.
+ # { 'field_name': [], # Contains anything
+ # 'restricted_field' : ['1', '0'] } # Only '0' or '1'
+ REQUIRED_PROPS = {}
+ OPTIONAL_PROPS = {}
- def _save(self, key, val):
- """Sets the backing for the |key| to the given |val|.
+ CACHING = False
- Returns:
- The saved value, in case any manipultions were made.
- """
- raise NotImplementedError
-
- def __getattr__(self, key):
- if key.startswith('_'):
- return super(PropBase, self).__getattr__(key)
- entry = string.split(key, '/')
- if self._root.has_child(entry[0]):
- c = self._root.child(entry[0])
- if type(c) == PropEntry:
- result = None
- # If not self.CACHING, self._cache is empty so this is false.
- if c in self._cache:
- result = self._cache[c]
+ def __init__(self):
+ self._root = self.create_prop_tree()
+ self._cache = {}
+
+ def complete(self):
+ """Returns true if all expected properties are set."""
+ for p in self.REQUIRED_PROPS.keys():
+ # We use getattr instead of _load so that results get cached.
+ if getattr(self, p) is None:
+ return False
+ return True
+
+ @classmethod
+ def properties(cls):
+ """Returns a dict of valid item keys and lists of validation values.
+
+ Keys must be bare, e.g., 'foo', or a valid relative path, e.g.,
+ 'foo/bar/baz'. Any intermediate values cannot contain values.
+ """
+ d = cls.OPTIONAL_PROPS.copy()
+ d.update(cls.REQUIRED_PROPS)
+ return d
+
+ def _load(self, key):
+ """Returns the correct value for the given |key|."""
+ raise NotImplementedError
+
+ def _save(self, key, val):
+ """Sets the backing for the |key| to the given |val|.
+
+ Returns:
+ The saved value, in case any manipultions were made.
+ """
+ raise NotImplementedError
+
+ def __getattr__(self, key):
+ if key.startswith('_'):
+ return super(PropBase, self).__getattr__(key)
+ entry = string.split(key, '/')
+ if self._root.has_child(entry[0]):
+ c = self._root.child(entry[0])
+ if type(c) == PropEntry:
+ result = None
+ # If not self.CACHING, self._cache is empty so this is false.
+ if c in self._cache:
+ result = self._cache[c]
+ else:
+ result = c.get()
+ # For space efficiency, only cache if desired.
+ if self.CACHING:
+ self._cache[c] = result
+ return result
+ return c
+ raise AttributeError(
+ "AttributeError: '%s' object has no attribute '%s'" % (type(self),
+ key))
+
+ def __setattr__(self, key, val):
+ if key.startswith('_'):
+ super(PropBase, self).__setattr__(key, val)
else:
- result = c.get()
- # For space efficiency, only cache if desired.
- if self.CACHING:
- self._cache[c] = result
- return result
- return c
- raise AttributeError, \
- "AttributeError: '%s' object has no attribute '%s'" % (type(self), key)
-
- def __setattr__(self, key, val):
- if key.startswith('_'):
- super(PropBase, self).__setattr__(key, val)
- else:
- if self._root.has_child(key):
- c = self._root.child(key)
- if type(c) == PropEntry:
- result = self._cache.get(c)
- if val != result:
- result = c.set(val)
- if self.CACHING:
- self._cache[c] = result
- return result
- raise AttributeError, \
- "AttributeError: '%s' object has no attribute '%s'" % (type(self), key)
-
- def create_prop_tree(self):
- """Creates a tree of PropGroups and PropEntrys from properties()"""
- root = PropGroup('root', self)
- for k, v in self.properties().iteritems():
- entry = string.split(k, '/')
- entry.reverse()
- component = root
- prefix = ''
- while len(entry) > 1:
- name = entry.pop()
- prefix = string.join([prefix, name], '.')
- if not component.has_child(name):
- component.add_child(PropGroup(name, component))
- component = component.child(name)
- # leaf
- component.add_child(PropEntry(entry[0],
- k, v,
- self._save, self._load))
- return root
-
- def dict(self):
- """Returns a fully populated a dict"""
- d = {}
- for p in self.properties():
- d[p] = self._load(p)
- return d
+ if self._root.has_child(key):
+ c = self._root.child(key)
+ if type(c) == PropEntry:
+ result = self._cache.get(c)
+ if val != result:
+ result = c.set(val)
+ if self.CACHING:
+ self._cache[c] = result
+ return result
+ raise AttributeError(
+ "AttributeError: '%s' object has no attribute '%s'" % (
+ type(self), key))
+
+ def create_prop_tree(self):
+ """Creates a tree of PropGroups and PropEntrys from properties()"""
+ root = PropGroup('root', self)
+ for k, v in self.properties().iteritems():
+ entry = string.split(k, '/')
+ entry.reverse()
+ component = root
+ prefix = ''
+ while len(entry) > 1:
+ name = entry.pop()
+ prefix = string.join([prefix, name], '.')
+ if not component.has_child(name):
+ component.add_child(PropGroup(name, component))
+ component = component.child(name)
+ # leaf
+ component.add_child(PropEntry(entry[0],
+ k, v,
+ self._save, self._load))
+ return root
+
+ def dict(self):
+ """Returns a fully populated a dict"""
+ d = {}
+ for p in self.properties():
+ d[p] = self._load(p)
+ return d
diff --git a/cli/lib/core/provision.py b/cli/lib/core/provision.py
index a621456..c8f5258 100644
--- a/cli/lib/core/provision.py
+++ b/cli/lib/core/provision.py
@@ -27,136 +27,138 @@ import error
class Error(error.Error):
- """Base provision error class."""
+ """Base provision error class."""
class MissingBuildError(Error):
- """Raised when the required binaries have not yet been built."""
+ """Raised when the required binaries have not yet been built."""
SYSTEM_IMAGE_FILENAME = 'system.img'
class _ProvisionDir(object):
- """Creates a directory to run `provision-device` from.
+ """Creates a directory to run `provision-device` from.
- This replicates the platform build directory using symlinks, except
- that system.img will be replaced with a link to a custom file if
- one is available.
+ This replicates the platform build directory using symlinks, except
+ that system.img will be replaced with a link to a custom file if
+ one is available.
- This is meant to be used as a context manager so that it's easy to
- be sure the directory is cleaned up before exiting.
- """
-
- def __init__(self, product_build_out, system_image_dir=None):
- """Creates a temp dir with the necessary files to provision.
-
- Args:
- product_build_out: product build output directory.
- system_image_dir: directory containing the custom system.img file
- to use. If None or system.img does not exist, uses the one in
- platform_build_out.
+ This is meant to be used as a context manager so that it's easy to
+ be sure the directory is cleaned up before exiting.
"""
- self.temp_dir = tempfile.mkdtemp(prefix='bdk-flash-')
-
- try:
- # Look for a custom system.img we should use instead.
- custom_image_file = None
- if system_image_dir:
- image_path = os.path.join(system_image_dir, SYSTEM_IMAGE_FILENAME)
- if os.path.isfile(image_path):
- custom_image_file = image_path
-
- self._fill_temp_dir(product_build_out, custom_image_file)
- except:
- # Wipe the directory if anything goes wrong.
- self.cleanup()
- raise
-
- def _fill_temp_dir(self, product_build_out, custom_image_file):
- """Fills the temporary directory with the required symlinks."""
- # Not everything in product_build_out is necessary, but it's easier to
- # just link it all and provision-device will pick up whatever it needs.
- for filename in os.listdir(product_build_out):
- link_dst = os.path.join(self.temp_dir, filename)
-
- # Replace the default system.img if we have a custom one.
- if custom_image_file and filename == SYSTEM_IMAGE_FILENAME:
- link_src = custom_image_file
- else:
- link_src = os.path.join(product_build_out, filename)
-
- os.symlink(link_src, link_dst)
- def __enter__(self):
- return self.temp_dir
-
- def __exit__(self, exception_type, value, traceback):
- self.cleanup()
-
- def cleanup(self):
- shutil.rmtree(self.temp_dir, ignore_errors=True)
+ def __init__(self, product_build_out, system_image_dir=None):
+ """Creates a temp dir with the necessary files to provision.
+
+ Args:
+ product_build_out: product build output directory.
+ system_image_dir: directory containing the custom system.img file
+ to use. If None or system.img does not exist, uses the one in
+ platform_build_out.
+ """
+ self.temp_dir = tempfile.mkdtemp(prefix='bdk-flash-')
+
+ try:
+ # Look for a custom system.img we should use instead.
+ custom_image_file = None
+ if system_image_dir:
+ image_path = os.path.join(system_image_dir,
+ SYSTEM_IMAGE_FILENAME)
+ if os.path.isfile(image_path):
+ custom_image_file = image_path
+
+ self._fill_temp_dir(product_build_out, custom_image_file)
+ except:
+ # Wipe the directory if anything goes wrong.
+ self.cleanup()
+ raise
+
+ def _fill_temp_dir(self, product_build_out, custom_image_file):
+ """Fills the temporary directory with the required symlinks."""
+ # Not everything in product_build_out is necessary, but it's easier to
+ # just link it all and provision-device will pick up whatever it needs.
+ for filename in os.listdir(product_build_out):
+ link_dst = os.path.join(self.temp_dir, filename)
+
+ # Replace the default system.img if we have a custom one.
+ if custom_image_file and filename == SYSTEM_IMAGE_FILENAME:
+ link_src = custom_image_file
+ else:
+ link_src = os.path.join(product_build_out, filename)
+
+ os.symlink(link_src, link_dst)
+
+ def __enter__(self):
+ return self.temp_dir
+
+ def __exit__(self, exception_type, value, traceback):
+ self.cleanup()
+
+ def cleanup(self):
+ shutil.rmtree(self.temp_dir, ignore_errors=True)
def _get_provision_tool(os_source_dir, platform_build_out, bsp):
- """Creates a ProvisionDeviceTool with proper settings.
+ """Creates a ProvisionDeviceTool with proper settings.
- Args:
- os_source_dir: the Brillo source root, needed by provision to
- locate prebuilt vendor binaries.
- platform_build_out: platform build output directory.
- bsp: BSP name.
+ Args:
+ os_source_dir: the Brillo source root, needed by provision to
+ locate prebuilt vendor binaries.
+ platform_build_out: platform build output directory.
+ bsp: BSP name.
- Returns:
- The created ProvisionDeviceTool.
+ Returns:
+ The created ProvisionDeviceTool.
- Raises:
- MissingBuildError: the platform has not been built yet.
- """
- # The provision-device script needs fastboot to be on PATH, so we create
- # a fastboot tool to add it's parent directory to the provision tool PATH.
- fastboot_tool = tool.HostToolWrapper('fastboot', platform_build_out)
- provision_tool = tool.ProvisionDeviceTool(
- os_source_dir, platform_build_out, bsp,
- env={'PATH': os.path.dirname(fastboot_tool.path())})
+ Raises:
+ MissingBuildError: the platform has not been built yet.
+ """
+ # The provision-device script needs fastboot to be on PATH, so we create
+ # a fastboot tool to add it's parent directory to the provision tool PATH.
+ fastboot_tool = tool.HostToolWrapper('fastboot', platform_build_out)
+ provision_tool = tool.ProvisionDeviceTool(
+ os_source_dir, platform_build_out, bsp,
+ env={'PATH': os.path.dirname(fastboot_tool.path())})
- for t in (fastboot_tool, provision_tool):
- if not t.exists():
- raise MissingBuildError(
- '{} does not exist; use `bdk build platform` first'.format(t.path()))
+ for t in (fastboot_tool, provision_tool):
+ if not t.exists():
+ raise MissingBuildError(
+ '{} does not exist; use `bdk build platform` first'.format(
+ t.path()))
- return provision_tool
+ return provision_tool
def provision_device(target, platform_build_out, system_image_dir=None,
provision_args=None):
- """Provisions the attached device using the `provision-device` script.
-
- Requires that the platform has been built so that the image files,
- fastboot, and provision-device all exist in platform_build_out.
-
- Args:
- target: the project Target object.
- platform_build_out: the root of the platform build output.
- system_image_dir: directory containing the custom system.img to
- flash. If None or the system.img file does not exist, uses
- the default system.img from platform_build_out instead.
- provision_args: list of string args to pass to provision-device.
-
- Returns:
- The `provision-device` exit code.
-
- Raises:
- MissingBuildError: the platform has not been built yet.
- core.util.OSVersionError: the target requests an invalid os version.
- """
- os_source_dir = util.GetOSPath(target.os_version)
- provision_tool = _get_provision_tool(os_source_dir, platform_build_out,
- target.board)
-
- # Combine the platform ANDROID_PRODUCT_OUT directory with our system.img.
- with _ProvisionDir(provision_tool.environment['ANDROID_PRODUCT_OUT'],
- system_image_dir) as provision_dir:
- # Point provision-device to the new directory.
- provision_tool.environment['ANDROID_PRODUCT_OUT'] = provision_dir
- return provision_tool.run(provision_args)
+ """Provisions the attached device using the `provision-device` script.
+
+ Requires that the platform has been built so that the image files,
+ fastboot, and provision-device all exist in platform_build_out.
+
+ Args:
+ target: the project Target object.
+ platform_build_out: the root of the platform build output.
+ system_image_dir: directory containing the custom system.img to
+ flash. If None or the system.img file does not exist, uses
+ the default system.img from platform_build_out instead.
+ provision_args: list of string args to pass to provision-device.
+
+ Returns:
+ The `provision-device` exit code.
+
+ Raises:
+ MissingBuildError: the platform has not been built yet.
+ core.util.OSVersionError: the target requests an invalid os version.
+ """
+ os_source_dir = util.GetOSPath(target.os_version)
+ provision_tool = _get_provision_tool(os_source_dir, platform_build_out,
+ target.board)
+
+ # Combine the platform ANDROID_PRODUCT_OUT directory with our system.img.
+ with _ProvisionDir(provision_tool.environment['ANDROID_PRODUCT_OUT'],
+ system_image_dir) as provision_dir:
+ # Point provision-device to the new directory.
+ provision_tool.environment['ANDROID_PRODUCT_OUT'] = provision_dir
+ return provision_tool.run(provision_args)
diff --git a/cli/lib/core/provision_unittest.py b/cli/lib/core/provision_unittest.py
index fd91115..d5dc10b 100644
--- a/cli/lib/core/provision_unittest.py
+++ b/cli/lib/core/provision_unittest.py
@@ -29,181 +29,189 @@ from test import stubs
class ProvisionDeviceTest(unittest.TestCase):
- """Tests for provision_device()."""
-
- _BSP = 'boardname'
- _CUSTOM_SYSTEM_IMAGE_DIR = '/foo'
- _CUSTOM_SYSTEM_IMAGE_PATH = os.path.join(_CUSTOM_SYSTEM_IMAGE_DIR,
- 'system.img')
- _HOST_ARCH = 'foo_arch'
- _PLATFORM_BUILD_OUT = '/platform'
-
- # Necessary paths derived from our constants.
- _PRODUCT_BUILD_OUT = util.GetAndroidProductOut(_PLATFORM_BUILD_OUT, _BSP)
- _PROVISION_DEVICE_PATH = os.path.join(_PRODUCT_BUILD_OUT, 'provision-device')
- _DEFAULT_SYSTEM_IMAGE_PATH = os.path.join(_PRODUCT_BUILD_OUT, 'system.img')
-
- _HOST_BUILD_OUT = os.path.join(_PLATFORM_BUILD_OUT, 'host', _HOST_ARCH)
- _FASTBOOT_PATH = os.path.join(_HOST_BUILD_OUT, 'bin', 'fastboot')
-
- def setUp(self):
- """Overrides imported modules with stubs."""
- self.stub_os = stubs.StubOs()
- self.stub_shutil = stubs.StubShutil(self.stub_os)
- self.stub_tempfile = stubs.StubTempfile(self.stub_os)
- self.stub_subprocess = stubs.StubSubprocess()
- self.stub_util = util_stub.StubUtil(arch=self._HOST_ARCH)
-
- provision.os = self.stub_os
- provision.shutil = self.stub_shutil
- provision.tempfile = self.stub_tempfile
- provision.util = self.stub_util
-
- provision.tool.os = self.stub_os
- provision.tool.subprocess = self.stub_subprocess
- provision.tool.util = self.stub_util
-
- self.target = target_stub.StubTarget(
- board=self._BSP, os_version=self.stub_util.GetOSVersion())
-
- # Give more obvious names to some commonly used variables.
- self.provision_temp_dir = self.stub_tempfile.temp_dir
- self.system_image_link = self.stub_os.path.join(self.provision_temp_dir,
- 'system.img')
-
- def tearDown(self):
- """Performs shared end-of-test logic."""
- # No matter the result, the temp dir should not exist afterwards.
- self.assertFalse(self.stub_os.path.exists(self.provision_temp_dir))
- self.assertFalse(self.stub_os.path.isdir(self.provision_temp_dir))
-
- def prepare_os_files(self):
- """Creates the necessary files for provision_device()."""
- self.stub_os.path.should_be_dir.append(self._PRODUCT_BUILD_OUT)
- self.stub_os.path.should_exist += [self._FASTBOOT_PATH,
- self._PROVISION_DEVICE_PATH,
- self._CUSTOM_SYSTEM_IMAGE_PATH]
- self.stub_os.should_create_link.append((
- self._PROVISION_DEVICE_PATH,
- os.path.join(self.provision_temp_dir, 'provision-device')))
- self.stub_os.should_makedirs.append(self.provision_temp_dir)
-
- def test_call(self):
- """Tests a successful provision-device call."""
- self.prepare_os_files()
- command = self.stub_subprocess.AddCommand()
-
- provision.provision_device(self.target, self._PLATFORM_BUILD_OUT)
- # provision-device environment must have:
- # * PATH contain the fastboot executable.
- # * ANDROID_BUILD_TOP point to the OS source root.
- # * ANDROID_PRODUCT_OUT point to our temporary product dir.
- command.AssertCallWas(
- [self._PROVISION_DEVICE_PATH], shell=False, cwd=None,
- stdout=None, stderr=None,
- env={'PATH': self.stub_os.path.dirname(self._FASTBOOT_PATH),
- 'ANDROID_BUILD_TOP': self.stub_util.GetOSPath(
- self.target.os_version),
- 'ANDROID_HOST_OUT': self._HOST_BUILD_OUT,
- 'ANDROID_PRODUCT_OUT': self.provision_temp_dir})
-
- def test_caller_env_path(self):
- """Tests that provision-device can access the caller's PATH."""
- self.prepare_os_files()
- self.stub_os.environ['PATH'] = '/foo/bar:/baz'
- command = self.stub_subprocess.AddCommand()
-
- provision.provision_device(self.target, self._PLATFORM_BUILD_OUT)
- self.assertEqual(
- self.stub_os.path.dirname(self._FASTBOOT_PATH) + ':/foo/bar:/baz',
- command.GetCallArgs()[1]['env']['PATH'])
-
- def test_product_build_symlinks(self):
- """Tests that the temp dir symlinks are generated."""
- self.prepare_os_files()
- # Put a few extra files in the product build directory as symlink targets.
- file_names = ('foo.txt', 'system.img', 'cache', 'bar000')
- for name in file_names:
- path = self.stub_os.path.join(self._PRODUCT_BUILD_OUT, name)
- self.stub_os.path.should_exist.append(path)
- self.stub_os.should_create_link.append((
- path, os.path.join(self.provision_temp_dir, name)))
-
- # Helper function to check that symlinks exist.
- def check_symlinks():
- for name in file_names:
- link_target = self.stub_os.path.join(self._PRODUCT_BUILD_OUT, name)
- link_path = self.stub_os.path.join(self.provision_temp_dir, name)
- self.assertEqual(link_target, self.stub_os.readlink(link_path))
-
- # Run the symlink check as a side effect of the subprocess because by the
- # time we get control back here the temp dir will have been cleaned up.
- self.stub_subprocess.AddCommand(side_effect=check_symlinks)
-
- provision.provision_device(self.target, self._PLATFORM_BUILD_OUT)
-
- def test_custom_system_image(self):
- """Tests that a custom system.img is used if provided."""
- self.prepare_os_files()
- self.stub_os.path.should_exist.append(self._DEFAULT_SYSTEM_IMAGE_PATH)
- self.stub_os.should_create_link.append((
- self._CUSTOM_SYSTEM_IMAGE_PATH,
- os.path.join(self.provision_temp_dir, 'system.img')))
-
- # Check the link points to the custom image, not the default system image.
- def check_custom_image_link():
- self.assertEqual(self._CUSTOM_SYSTEM_IMAGE_PATH,
- self.stub_os.readlink(self.system_image_link))
- self.stub_subprocess.AddCommand(side_effect=check_custom_image_link)
-
- provision.provision_device(self.target, self._PLATFORM_BUILD_OUT,
- system_image_dir=self._CUSTOM_SYSTEM_IMAGE_DIR)
-
- def test_custom_system_image_missing(self):
- """Tests specifying a nonexistent custom system.img."""
- self.prepare_os_files()
- self.stub_os.path.should_exist.remove(self._CUSTOM_SYSTEM_IMAGE_PATH)
- self.stub_os.path.should_exist.append(self._DEFAULT_SYSTEM_IMAGE_PATH)
- self.stub_os.should_create_link.append((
- self._DEFAULT_SYSTEM_IMAGE_PATH,
- os.path.join(self.provision_temp_dir, 'system.img')))
-
- # In this case, even though the manifest may give a potential output
- # directory, the user has not built a custom system.img yet, so we should
- # be linking to the default system.img.
- def check_default_image_link():
- self.assertEqual(self._DEFAULT_SYSTEM_IMAGE_PATH,
- self.stub_os.readlink(self.system_image_link))
- self.stub_subprocess.AddCommand(side_effect=check_default_image_link)
-
- provision.provision_device(self.target, self._PLATFORM_BUILD_OUT,
- system_image_dir=self._CUSTOM_SYSTEM_IMAGE_DIR)
-
- def test_provision_exit_code(self):
- """Tests non-zero exit codes are propagated."""
- self.prepare_os_files()
- # Test an errno other than the default.
- errno = error.GENERIC_ERRNO + 1
- self.stub_subprocess.AddCommand(ret_code=errno)
-
- with self.assertRaises(error.Error) as e:
- provision.provision_device(self.target, self._PLATFORM_BUILD_OUT)
- self.assertEqual(errno, e.exception.errno)
-
- def test_fastboot_missing(self):
- """Tests failure when fastboot is missing."""
- self.prepare_os_files()
- self.stub_os.path.should_exist.remove(self._FASTBOOT_PATH)
- self.stub_subprocess.AddCommand()
-
- with self.assertRaises(provision.MissingBuildError):
- provision.provision_device(self.target, self._PLATFORM_BUILD_OUT)
-
- def test_provision_device_missing(self):
- """Tests failure when provision-device is missing."""
- self.prepare_os_files()
- self.stub_os.path.should_exist.remove(self._PROVISION_DEVICE_PATH)
- self.stub_subprocess.AddCommand()
-
- with self.assertRaises(provision.MissingBuildError):
- provision.provision_device(self.target, self._PLATFORM_BUILD_OUT)
+ """Tests for provision_device()."""
+
+ _BSP = 'boardname'
+ _CUSTOM_SYSTEM_IMAGE_DIR = '/foo'
+ _CUSTOM_SYSTEM_IMAGE_PATH = os.path.join(_CUSTOM_SYSTEM_IMAGE_DIR,
+ 'system.img')
+ _HOST_ARCH = 'foo_arch'
+ _PLATFORM_BUILD_OUT = '/platform'
+
+ # Necessary paths derived from our constants.
+ _PRODUCT_BUILD_OUT = util.GetAndroidProductOut(_PLATFORM_BUILD_OUT, _BSP)
+ _PROVISION_DEVICE_PATH = os.path.join(_PRODUCT_BUILD_OUT,
+ 'provision-device')
+ _DEFAULT_SYSTEM_IMAGE_PATH = os.path.join(_PRODUCT_BUILD_OUT, 'system.img')
+
+ _HOST_BUILD_OUT = os.path.join(_PLATFORM_BUILD_OUT, 'host', _HOST_ARCH)
+ _FASTBOOT_PATH = os.path.join(_HOST_BUILD_OUT, 'bin', 'fastboot')
+
+ def setUp(self):
+ """Overrides imported modules with stubs."""
+ self.stub_os = stubs.StubOs()
+ self.stub_shutil = stubs.StubShutil(self.stub_os)
+ self.stub_tempfile = stubs.StubTempfile(self.stub_os)
+ self.stub_subprocess = stubs.StubSubprocess()
+ self.stub_util = util_stub.StubUtil(arch=self._HOST_ARCH)
+
+ provision.os = self.stub_os
+ provision.shutil = self.stub_shutil
+ provision.tempfile = self.stub_tempfile
+ provision.util = self.stub_util
+
+ provision.tool.os = self.stub_os
+ provision.tool.subprocess = self.stub_subprocess
+ provision.tool.util = self.stub_util
+
+ self.target = target_stub.StubTarget(
+ board=self._BSP, os_version=self.stub_util.GetOSVersion())
+
+ # Give more obvious names to some commonly used variables.
+ self.provision_temp_dir = self.stub_tempfile.temp_dir
+ self.system_image_link = self.stub_os.path.join(self.provision_temp_dir,
+ 'system.img')
+
+ def tearDown(self):
+ """Performs shared end-of-test logic."""
+ # No matter the result, the temp dir should not exist afterwards.
+ self.assertFalse(self.stub_os.path.exists(self.provision_temp_dir))
+ self.assertFalse(self.stub_os.path.isdir(self.provision_temp_dir))
+
+ def prepare_os_files(self):
+ """Creates the necessary files for provision_device()."""
+ self.stub_os.path.should_be_dir.append(self._PRODUCT_BUILD_OUT)
+ self.stub_os.path.should_exist += [self._FASTBOOT_PATH,
+ self._PROVISION_DEVICE_PATH,
+ self._CUSTOM_SYSTEM_IMAGE_PATH]
+ self.stub_os.should_create_link.append((
+ self._PROVISION_DEVICE_PATH,
+ os.path.join(self.provision_temp_dir, 'provision-device')))
+ self.stub_os.should_makedirs.append(self.provision_temp_dir)
+
+ def test_call(self):
+ """Tests a successful provision-device call."""
+ self.prepare_os_files()
+ command = self.stub_subprocess.AddCommand()
+
+ provision.provision_device(self.target, self._PLATFORM_BUILD_OUT)
+ # provision-device environment must have:
+ # * PATH contain the fastboot executable.
+ # * ANDROID_BUILD_TOP point to the OS source root.
+ # * ANDROID_PRODUCT_OUT point to our temporary product dir.
+ command.AssertCallWas(
+ [self._PROVISION_DEVICE_PATH], shell=False, cwd=None,
+ stdout=None, stderr=None,
+ env={'PATH': self.stub_os.path.dirname(self._FASTBOOT_PATH),
+ 'ANDROID_BUILD_TOP': self.stub_util.GetOSPath(
+ self.target.os_version),
+ 'ANDROID_HOST_OUT': self._HOST_BUILD_OUT,
+ 'ANDROID_PRODUCT_OUT': self.provision_temp_dir})
+
+ def test_caller_env_path(self):
+ """Tests that provision-device can access the caller's PATH."""
+ self.prepare_os_files()
+ self.stub_os.environ['PATH'] = '/foo/bar:/baz'
+ command = self.stub_subprocess.AddCommand()
+
+ provision.provision_device(self.target, self._PLATFORM_BUILD_OUT)
+ self.assertEqual(
+ self.stub_os.path.dirname(self._FASTBOOT_PATH) + ':/foo/bar:/baz',
+ command.GetCallArgs()[1]['env']['PATH'])
+
+ def test_product_build_symlinks(self):
+ """Tests that the temp dir symlinks are generated."""
+ self.prepare_os_files()
+ # Put a few extra files in the product build directory as symlink
+ # targets.
+ file_names = ('foo.txt', 'system.img', 'cache', 'bar000')
+ for name in file_names:
+ path = self.stub_os.path.join(self._PRODUCT_BUILD_OUT, name)
+ self.stub_os.path.should_exist.append(path)
+ self.stub_os.should_create_link.append((
+ path, os.path.join(self.provision_temp_dir, name)))
+
+ # Helper function to check that symlinks exist.
+ def check_symlinks():
+ for name in file_names:
+ link_target = self.stub_os.path.join(self._PRODUCT_BUILD_OUT,
+ name)
+ link_path = self.stub_os.path.join(self.provision_temp_dir,
+ name)
+ self.assertEqual(link_target, self.stub_os.readlink(link_path))
+
+ # Run the symlink check as a side effect of the subprocess because by
+ # the time we get control back here the temp dir will have been cleaned
+ # up.
+ self.stub_subprocess.AddCommand(side_effect=check_symlinks)
+
+ provision.provision_device(self.target, self._PLATFORM_BUILD_OUT)
+
+ def test_custom_system_image(self):
+ """Tests that a custom system.img is used if provided."""
+ self.prepare_os_files()
+ self.stub_os.path.should_exist.append(self._DEFAULT_SYSTEM_IMAGE_PATH)
+ self.stub_os.should_create_link.append((
+ self._CUSTOM_SYSTEM_IMAGE_PATH,
+ os.path.join(self.provision_temp_dir, 'system.img')))
+
+ # Check the link points to the custom image, not the default system
+ # image.
+ def check_custom_image_link():
+ self.assertEqual(self._CUSTOM_SYSTEM_IMAGE_PATH,
+ self.stub_os.readlink(self.system_image_link))
+ self.stub_subprocess.AddCommand(side_effect=check_custom_image_link)
+
+ provision.provision_device(
+ self.target, self._PLATFORM_BUILD_OUT,
+ system_image_dir=self._CUSTOM_SYSTEM_IMAGE_DIR)
+
+ def test_custom_system_image_missing(self):
+ """Tests specifying a nonexistent custom system.img."""
+ self.prepare_os_files()
+ self.stub_os.path.should_exist.remove(self._CUSTOM_SYSTEM_IMAGE_PATH)
+ self.stub_os.path.should_exist.append(self._DEFAULT_SYSTEM_IMAGE_PATH)
+ self.stub_os.should_create_link.append((
+ self._DEFAULT_SYSTEM_IMAGE_PATH,
+ os.path.join(self.provision_temp_dir, 'system.img')))
+
+ # In this case, even though the manifest may give a potential output
+ # directory, the user has not built a custom system.img yet, so we
+ # should be linking to the default system.img.
+ def check_default_image_link():
+ self.assertEqual(self._DEFAULT_SYSTEM_IMAGE_PATH,
+ self.stub_os.readlink(self.system_image_link))
+ self.stub_subprocess.AddCommand(side_effect=check_default_image_link)
+
+ provision.provision_device(
+ self.target, self._PLATFORM_BUILD_OUT,
+ system_image_dir=self._CUSTOM_SYSTEM_IMAGE_DIR)
+
+ def test_provision_exit_code(self):
+ """Tests non-zero exit codes are propagated."""
+ self.prepare_os_files()
+ # Test an errno other than the default.
+ errno = error.GENERIC_ERRNO + 1
+ self.stub_subprocess.AddCommand(ret_code=errno)
+
+ with self.assertRaises(error.Error) as e:
+ provision.provision_device(self.target, self._PLATFORM_BUILD_OUT)
+ self.assertEqual(errno, e.exception.errno)
+
+ def test_fastboot_missing(self):
+ """Tests failure when fastboot is missing."""
+ self.prepare_os_files()
+ self.stub_os.path.should_exist.remove(self._FASTBOOT_PATH)
+ self.stub_subprocess.AddCommand()
+
+ with self.assertRaises(provision.MissingBuildError):
+ provision.provision_device(self.target, self._PLATFORM_BUILD_OUT)
+
+ def test_provision_device_missing(self):
+ """Tests failure when provision-device is missing."""
+ self.prepare_os_files()
+ self.stub_os.path.should_exist.remove(self._PROVISION_DEVICE_PATH)
+ self.stub_subprocess.AddCommand()
+
+ with self.assertRaises(provision.MissingBuildError):
+ provision.provision_device(self.target, self._PLATFORM_BUILD_OUT)
diff --git a/cli/lib/core/timer.py b/cli/lib/core/timer.py
index 3a5ca9a..3081c92 100644
--- a/cli/lib/core/timer.py
+++ b/cli/lib/core/timer.py
@@ -21,69 +21,71 @@ import time
class Timer(object):
- """A simple object to time various things.
-
- Attributes:
- name - The name of what is being timed.
- label - (optional) An additional text label for the timing. Default None.
- """
-
- _all_timers = []
- _group_paused_timers = []
-
- @classmethod
- def MassPauseRunningTimers(cls):
- """Pauses all running timers."""
- timers_to_pause = [t for t in cls._all_timers if t.IsRunning()]
- for t in timers_to_pause:
- t.Stop()
- cls._group_paused_timers += timers_to_pause
-
- @classmethod
- def ResumeMassPausedTimers(cls):
- """Resumes all timers that were paused using cls.MassPauseRunningTimers()"""
- for t in cls._group_paused_timers:
- t.Start()
- cls._group_paused_timers = []
-
- def __init__(self, name, label=None):
- self._all_timers.append(self)
- self.name = name
- self.label = label
- self._start_time = None
- self.Reset()
-
- def Reset(self):
- """Resets the timer, stopping it and setting it to 0."""
- self._time = 0
- self._start_time = None
-
- def Start(self):
- """Starts the timer continuing from whatever time is showing.
-
- Does nothing to a timer that is already running.
- """
- if self._start_time is None:
- self._start_time = time.time()
-
- def Stop(self):
- """Stops the timer, leaving its current amount of time showing."""
- if self._start_time is not None:
- self._time += time.time() - self._start_time
- self._start_time = None
-
- def IsRunning(self):
- return self._start_time is not None
-
- def Read(self):
- """Read the current time of the timer in seconds with float precision.
+ """A simple object to time various things.
- Works while running, but best practice would be to Stop first.
+ Attributes:
+ name: The name of what is being timed.
+ label: (optional) An additional text label for the timing. Default None.
"""
- # Amount of time previously timed.
- result = self._time
- # If currently running, look at the time elapsed since last stop.
- if self._start_time is not None:
- result += time.time() - self._start_time
- return result
+ _all_timers = []
+ _group_paused_timers = []
+
+ @classmethod
+ def MassPauseRunningTimers(cls):
+ """Pauses all running timers."""
+ timers_to_pause = [t for t in cls._all_timers if t.IsRunning()]
+ for t in timers_to_pause:
+ t.Stop()
+ cls._group_paused_timers += timers_to_pause
+
+ @classmethod
+ def ResumeMassPausedTimers(cls):
+ """Resumes all timers that were paused using
+ cls.MassPauseRunningTimers()
+ """
+ for t in cls._group_paused_timers:
+ t.Start()
+ cls._group_paused_timers = []
+
+ def __init__(self, name, label=None):
+ self._all_timers.append(self)
+ self.name = name
+ self.label = label
+ self._start_time = None
+ self.Reset()
+
+ def Reset(self):
+ """Resets the timer, stopping it and setting it to 0."""
+ self._time = 0
+ self._start_time = None
+
+ def Start(self):
+ """Starts the timer continuing from whatever time is showing.
+
+ Does nothing to a timer that is already running.
+ """
+ if self._start_time is None:
+ self._start_time = time.time()
+
+ def Stop(self):
+ """Stops the timer, leaving its current amount of time showing."""
+ if self._start_time is not None:
+ self._time += time.time() - self._start_time
+ self._start_time = None
+
+ def IsRunning(self):
+ return self._start_time is not None
+
+ def Read(self):
+ """Read the current time of the timer in seconds with float precision.
+
+ Works while running, but best practice would be to Stop first.
+ """
+ # Amount of time previously timed.
+ result = self._time
+ # If currently running, look at the time elapsed since last stop.
+ if self._start_time is not None:
+ result += time.time() - self._start_time
+
+ return result
diff --git a/cli/lib/core/timer_unittest.py b/cli/lib/core/timer_unittest.py
index b8375f7..dd32a65 100644
--- a/cli/lib/core/timer_unittest.py
+++ b/cli/lib/core/timer_unittest.py
@@ -25,113 +25,113 @@ from test import stubs
class TimerTest(unittest.TestCase):
- def setUp(self):
- self.stub_time = stubs.StubTime()
-
- timer.time = self.stub_time
-
- def RunningTimerWithTime(self, time):
- """Returns a timer running with a given amount of time already on it.
-
- The timer will have been started at stub_time = 0.
- """
- self.stub_time.current_time = 0
- t = timer.Timer('event')
- t.Start()
- self.stub_time.current_time = time
- # Confirm that this is indeed a running timer with the desired time.
- self.assertTrue(t.IsRunning())
- self.assertAlmostEqual(t.Read(), time)
- return t
-
- def StoppedTimerWithTime(self, time):
- """Returns a timer stopped with a given amount of time already on it."""
- t = self.RunningTimerWithTime(time)
- t.Stop()
- # Confirm that this is indeed a stopped timer with the desired time.
- self.assertFalse(t.IsRunning())
- self.assertAlmostEqual(t.Read(), time)
- return t
-
- def test_init(self):
- # Time at init shouldn't matter.
- self.stub_time.current_time = 123.45
- t = timer.Timer('event')
- self.assertFalse(t.IsRunning())
- self.assertAlmostEqual(t.Read(), 0)
-
- def test_stopped_read(self):
- t = self.StoppedTimerWithTime(12.3)
- self.assertAlmostEqual(t.Read(), 12.3)
- # Reading shouldn't change running state.
- self.assertFalse(t.IsRunning())
-
- # Even if time is progressed, it should still read the same value.
- self.stub_time.current_time = 45.6
- self.assertAlmostEqual(t.Read(), 12.3)
-
- def test_stopped_start(self):
- t = self.StoppedTimerWithTime(12.3)
- t.Start()
- self.assertTrue(t.IsRunning())
- # Should not reset time.
- self.assertAlmostEqual(t.Read(), 12.3)
-
- def test_stopped_reset(self):
- t = self.StoppedTimerWithTime(12.3)
- t.Reset()
- # Reset should stop and set to 0.
- self.assertFalse(t.IsRunning())
- self.assertAlmostEqual(t.Read(), 0)
-
- def test_stopped_stop(self):
- t = self.StoppedTimerWithTime(12.3)
- t.Stop()
- # Stop should have no further effect.
- self.assertFalse(t.IsRunning())
- self.assertAlmostEqual(t.Read(), 12.3)
-
- def test_running_read(self):
- t = self.RunningTimerWithTime(12.3)
- self.assertAlmostEqual(t.Read(), 12.3)
- # Read should not change running state.
- self.assertTrue(t.IsRunning())
-
- # If time is progressed, it should read the appropriate value.
- self.stub_time.current_time = 45.6
- self.assertAlmostEqual(t.Read(), 45.6)
-
- def test_running_start(self):
- t = self.RunningTimerWithTime(12.3)
- t.Start()
- # Should have no effect.
- self.assertTrue(t.IsRunning())
- self.assertAlmostEqual(t.Read(), 12.3)
-
- def test_running_reset(self):
- t = self.RunningTimerWithTime(12.3)
- t.Reset()
- # Reset should stop and set to 0.
- self.assertFalse(t.IsRunning())
- self.assertAlmostEqual(t.Read(), 0)
-
- def test_running_stop(self):
- t = self.RunningTimerWithTime(12.3)
- t.Stop()
- # Stop should stop but maintain time.
- self.assertFalse(t.IsRunning())
- self.assertAlmostEqual(t.Read(), 12.3)
-
- def test_combined_read(self):
- t = self.StoppedTimerWithTime(12.3)
- self.stub_time.current_time = 20
- t.Start()
- self.stub_time.current_time = 45.6
- expected_read_time = 12.3 + (45.6 - 20)
- # When a timer has been previously stopped, current running time should be
- # additive.
- self.assertAlmostEqual(t.Read(), expected_read_time)
- # Stopping again shouldn't overwrite this number.
- t.Stop()
- self.stub_time.current_time = 56.7
- self.assertAlmostEqual(t.Read(), expected_read_time)
+ def setUp(self):
+ self.stub_time = stubs.StubTime()
+
+ timer.time = self.stub_time
+
+ def RunningTimerWithTime(self, time):
+ """Returns a timer running with a given amount of time already on it.
+
+ The timer will have been started at stub_time = 0.
+ """
+ self.stub_time.current_time = 0
+ t = timer.Timer('event')
+ t.Start()
+ self.stub_time.current_time = time
+ # Confirm that this is indeed a running timer with the desired time.
+ self.assertTrue(t.IsRunning())
+ self.assertAlmostEqual(t.Read(), time)
+ return t
+
+ def StoppedTimerWithTime(self, time):
+ """Returns a timer stopped with a given amount of time already on it."""
+ t = self.RunningTimerWithTime(time)
+ t.Stop()
+ # Confirm that this is indeed a stopped timer with the desired time.
+ self.assertFalse(t.IsRunning())
+ self.assertAlmostEqual(t.Read(), time)
+ return t
+
+ def test_init(self):
+ # Time at init shouldn't matter.
+ self.stub_time.current_time = 123.45
+ t = timer.Timer('event')
+ self.assertFalse(t.IsRunning())
+ self.assertAlmostEqual(t.Read(), 0)
+
+ def test_stopped_read(self):
+ t = self.StoppedTimerWithTime(12.3)
+ self.assertAlmostEqual(t.Read(), 12.3)
+ # Reading shouldn't change running state.
+ self.assertFalse(t.IsRunning())
+
+ # Even if time is progressed, it should still read the same value.
+ self.stub_time.current_time = 45.6
+ self.assertAlmostEqual(t.Read(), 12.3)
+
+ def test_stopped_start(self):
+ t = self.StoppedTimerWithTime(12.3)
+ t.Start()
+ self.assertTrue(t.IsRunning())
+ # Should not reset time.
+ self.assertAlmostEqual(t.Read(), 12.3)
+
+ def test_stopped_reset(self):
+ t = self.StoppedTimerWithTime(12.3)
+ t.Reset()
+ # Reset should stop and set to 0.
+ self.assertFalse(t.IsRunning())
+ self.assertAlmostEqual(t.Read(), 0)
+
+ def test_stopped_stop(self):
+ t = self.StoppedTimerWithTime(12.3)
+ t.Stop()
+ # Stop should have no further effect.
+ self.assertFalse(t.IsRunning())
+ self.assertAlmostEqual(t.Read(), 12.3)
+
+ def test_running_read(self):
+ t = self.RunningTimerWithTime(12.3)
+ self.assertAlmostEqual(t.Read(), 12.3)
+ # Read should not change running state.
+ self.assertTrue(t.IsRunning())
+
+ # If time is progressed, it should read the appropriate value.
+ self.stub_time.current_time = 45.6
+ self.assertAlmostEqual(t.Read(), 45.6)
+
+ def test_running_start(self):
+ t = self.RunningTimerWithTime(12.3)
+ t.Start()
+ # Should have no effect.
+ self.assertTrue(t.IsRunning())
+ self.assertAlmostEqual(t.Read(), 12.3)
+
+ def test_running_reset(self):
+ t = self.RunningTimerWithTime(12.3)
+ t.Reset()
+ # Reset should stop and set to 0.
+ self.assertFalse(t.IsRunning())
+ self.assertAlmostEqual(t.Read(), 0)
+
+ def test_running_stop(self):
+ t = self.RunningTimerWithTime(12.3)
+ t.Stop()
+ # Stop should stop but maintain time.
+ self.assertFalse(t.IsRunning())
+ self.assertAlmostEqual(t.Read(), 12.3)
+
+ def test_combined_read(self):
+ t = self.StoppedTimerWithTime(12.3)
+ self.stub_time.current_time = 20
+ t.Start()
+ self.stub_time.current_time = 45.6
+ expected_read_time = 12.3 + (45.6 - 20)
+ # When a timer has been previously stopped, current running time should
+ # be additive.
+ self.assertAlmostEqual(t.Read(), expected_read_time)
+ # Stopping again shouldn't overwrite this number.
+ t.Stop()
+ self.stub_time.current_time = 56.7
+ self.assertAlmostEqual(t.Read(), expected_read_time)
diff --git a/cli/lib/core/tool.py b/cli/lib/core/tool.py
index 699dbef..96f1da7 100644
--- a/cli/lib/core/tool.py
+++ b/cli/lib/core/tool.py
@@ -26,17 +26,17 @@ import error
class Error(error.Error):
- """Base class for all tool errors."""
+ """Base class for all tool errors."""
class ExecuteError(Error):
- """Raised when the tool fails to execute."""
- description = 'Failed to execute'
+ """Raised when the tool fails to execute."""
+ description = 'Failed to execute'
class ReturnError(Error):
- """Raised when the tool returns an error code."""
- description = 'Error code returned from tool'
+ """Raised when the tool returns an error code."""
+ description = 'Error code returned from tool'
# Variables to pass through to all command calls. These are mostly needed
@@ -48,295 +48,300 @@ DEFAULT_PASSTHROUGH_ENV = [
class ToolWrapper(object):
- """Wraps a host binary, target script, or build command.
-
- The advantages of this over using subprocess directly are:
- * Properly sets the execution working directory with set_cwd().
- * Strips '--' argument if given.
- * Restricts passthrough environment to a safe set of defaults.
- * Handles signal return codes properly.
- * Helper function to set common environment variables.
-
- Attributes:
- environment: a dictionary of environment variables to pass.
- """
-
- def __init__(self, path, env=None):
- """Initializes a ToolWrapper.
-
- Args:
- path: path to the tool executable.
- env: a dictionary of additional environmental variables to set.
- Can also be set after creation via the environment attribute.
- """
- self._tool_path = path
- self._cwd = None
- self.environment = {var: os.environ[var] for var in DEFAULT_PASSTHROUGH_ENV
- if var in os.environ}
- if env:
- self.environment.update(env)
-
- def set_cwd(self, cwd):
- self._cwd = cwd
-
- def set_android_environment(self, source_top=None, build_out=None, bsp=None):
- """Sets Android environment variables.
-
- Android has a few common variables used by a variety of tools. This
- function sets any environment variables that can be derived from
- the given arguments.
-
- All arguments are optional, any variables that depend on missing
- arguments will not be modified. Requirements are:
- ANDROID_BUILD_TOP: source_top
- ANDROID_HOST_OUT: build_out
- ANDROID_PRODUCT_OUT: build_out, bsp
-
- Args:
- source_top: root of the source tree.
- build_out: root of the build output folder.
- bsp: the BSP name.
- """
- if source_top:
- self.environment['ANDROID_BUILD_TOP'] = source_top
-
- if build_out:
- self.environment['ANDROID_HOST_OUT'] = os.path.join(
- build_out, 'host', util.GetHostArch())
-
- if build_out and bsp:
- self.environment['ANDROID_PRODUCT_OUT'] = util.GetAndroidProductOut(
- build_out, bsp)
-
- def add_caller_env_path(self):
- """Adds the caller's PATH environment variable.
+ """Wraps a host binary, target script, or build command.
- Most tools do not want to inherit PATH to avoid any unexpected
- interactions with the caller's environment. However, some tools
- do need the PATH variable (e.g. shell scripts may need to be able
- to find utilities like dirname).
+ The advantages of this over using subprocess directly are:
+ * Properly sets the execution working directory with set_cwd().
+ * Strips '--' argument if given.
+ * Restricts passthrough environment to a safe set of defaults.
+ * Handles signal return codes properly.
+ * Helper function to set common environment variables.
- If the tool already has a PATH, the caller's PATH is appended.
+ Attributes:
+ environment: a dictionary of environment variables to pass.
"""
- caller_path = os.environ.get('PATH')
- if caller_path is not None:
- if 'PATH' in self.environment:
- self.environment['PATH'] += os.pathsep + caller_path
- else:
- self.environment['PATH'] = caller_path
-
-
- def path(self):
- return self._tool_path
-
- def exists(self):
- return os.path.isfile(self._tool_path)
-
- def run(self, arg_array=None, piped=False):
- """Executes the tool and blocks until completion.
- Args:
- arg_array: list of string arguments to pass to the tool.
- piped: If true, send stdout and stderr to pipes.
-
- Raises:
- ExecuteError: if execution fails.
- ReturnError: if execution returns a non-0 exit code.
-
- Returns:
- (out, err): The output to stdout and stderr of the called tool.
- Will both be None if piped=False.
- """
- # Remove -- passthrough indicator from arg_array.
- if arg_array and arg_array[0] == '--':
- arg_array = arg_array[1:]
-
- # Make sure PWD is accurate on CWD change.
- if self._cwd:
- self.environment['PWD'] = os.path.abspath(self._cwd)
-
- stdout = None
- stderr = None
- if piped:
- stdout = subprocess.PIPE
- stderr = subprocess.PIPE
-
- try:
- tool_process = subprocess.Popen([self._tool_path] + (arg_array or []),
- env=self.environment, shell=False,
- cwd=self._cwd, stdout=stdout,
- stderr=stderr)
- (out, err) = tool_process.communicate()
- except OSError as e:
- # Catch and re-raise so we can include the tool path in the message.
- raise ExecuteError('"{}": {} [{}]'.format(
- self._tool_path, e.errno, e.strerror))
-
- # Exiting via signal gives negative return codes.
- ret = tool_process.returncode
- if ret < 0:
- # Return the normal shell exit mask for being signaled.
- ret = 128 - ret
- if ret != 0:
- raise ReturnError('"{}": {} ({})'.format(self._tool_path, ret, err),
- errno=ret)
-
- return (out, err)
+ def __init__(self, path, env=None):
+ """Initializes a ToolWrapper.
+
+ Args:
+ path: path to the tool executable.
+ env: a dictionary of additional environmental variables to set.
+ Can also be set after creation via the environment attribute.
+ """
+ self._tool_path = path
+ self._cwd = None
+ self.environment = {var: os.environ[var]
+ for var in DEFAULT_PASSTHROUGH_ENV
+ if var in os.environ}
+ if env:
+ self.environment.update(env)
+
+ def set_cwd(self, cwd):
+ self._cwd = cwd
+
+ def set_android_environment(self, source_top=None, build_out=None,
+ bsp=None):
+ """Sets Android environment variables.
+
+ Android has a few common variables used by a variety of tools. This
+ function sets any environment variables that can be derived from
+ the given arguments.
+
+ All arguments are optional, any variables that depend on missing
+ arguments will not be modified. Requirements are:
+ ANDROID_BUILD_TOP: source_top
+ ANDROID_HOST_OUT: build_out
+ ANDROID_PRODUCT_OUT: build_out, bsp
+
+ Args:
+ source_top: root of the source tree.
+ build_out: root of the build output folder.
+ bsp: the BSP name.
+ """
+ if source_top:
+ self.environment['ANDROID_BUILD_TOP'] = source_top
+
+ if build_out:
+ self.environment['ANDROID_HOST_OUT'] = os.path.join(
+ build_out, 'host', util.GetHostArch())
+
+ if build_out and bsp:
+ self.environment['ANDROID_PRODUCT_OUT'] = util.GetAndroidProductOut(
+ build_out, bsp)
+
+ def add_caller_env_path(self):
+ """Adds the caller's PATH environment variable.
+
+ Most tools do not want to inherit PATH to avoid any unexpected
+ interactions with the caller's environment. However, some tools
+ do need the PATH variable (e.g. shell scripts may need to be able
+ to find utilities like dirname).
+
+ If the tool already has a PATH, the caller's PATH is appended.
+ """
+ caller_path = os.environ.get('PATH')
+ if caller_path is not None:
+ if 'PATH' in self.environment:
+ self.environment['PATH'] += os.pathsep + caller_path
+ else:
+ self.environment['PATH'] = caller_path
+
+
+ def path(self):
+ return self._tool_path
+
+ def exists(self):
+ return os.path.isfile(self._tool_path)
+
+ def run(self, arg_array=None, piped=False):
+ """Executes the tool and blocks until completion.
+
+ Args:
+ arg_array: list of string arguments to pass to the tool.
+ piped: If true, send stdout and stderr to pipes.
+
+ Raises:
+ ExecuteError: if execution fails.
+ ReturnError: if execution returns a non-0 exit code.
+
+ Returns:
+ (out, err): The output to stdout and stderr of the called tool.
+ Will both be None if piped=False.
+ """
+ # Remove -- passthrough indicator from arg_array.
+ if arg_array and arg_array[0] == '--':
+ arg_array = arg_array[1:]
+
+ # Make sure PWD is accurate on CWD change.
+ if self._cwd:
+ self.environment['PWD'] = os.path.abspath(self._cwd)
+
+ stdout = None
+ stderr = None
+ if piped:
+ stdout = subprocess.PIPE
+ stderr = subprocess.PIPE
+
+ try:
+ tool_process = subprocess.Popen(
+ [self._tool_path] + (arg_array or []),
+ env=self.environment, shell=False,
+ cwd=self._cwd, stdout=stdout,
+ stderr=stderr)
+ (out, err) = tool_process.communicate()
+ except OSError as e:
+ # Catch and re-raise so we can include the tool path in the message.
+ raise ExecuteError('"{}": {} [{}]'.format(
+ self._tool_path, e.errno, e.strerror))
+
+ # Exiting via signal gives negative return codes.
+ ret = tool_process.returncode
+ if ret < 0:
+ # Return the normal shell exit mask for being signaled.
+ ret = 128 - ret
+ if ret != 0:
+ raise ReturnError('"{}": {} ({})'.format(self._tool_path, ret, err),
+ errno=ret)
+
+ return (out, err)
class HostToolWrapper(ToolWrapper):
- """Wraps a tool from out/host/<arch>/bin/."""
+ """Wraps a tool from out/host/<arch>/bin/."""
- def __init__(self, path, build_out, bsp=None, env=None):
- """Initializes a HostToolWrapper.
+ def __init__(self, path, build_out, bsp=None, env=None):
+ """Initializes a HostToolWrapper.
- Args:
- path: tool path relative to <build_top>/out/host/<arch>/bin/.
- build_out: root of the build output folder where the tool lives.
- bsp: the BSP name. Optional, but should be set for any tool that
- uses ANDROID_PRODUCT_OUT (e.g. fastboot and adb).
- env: a dictionary of additional environmental variables to set.
- """
- # Initialize path to '' at first so we can use ANDROID_HOST_OUT.
- super(HostToolWrapper, self).__init__('', env=env)
- self.set_android_environment(build_out=build_out, bsp=bsp)
- self._tool_path = os.path.join(self.environment['ANDROID_HOST_OUT'],
- 'bin', path)
+ Args:
+ path: tool path relative to <build_top>/out/host/<arch>/bin/.
+ build_out: root of the build output folder where the tool lives.
+ bsp: the BSP name. Optional, but should be set for any tool that
+ uses ANDROID_PRODUCT_OUT (e.g. fastboot and adb).
+ env: a dictionary of additional environmental variables to set.
+ """
+ # Initialize path to '' at first so we can use ANDROID_HOST_OUT.
+ super(HostToolWrapper, self).__init__('', env=env)
+ self.set_android_environment(build_out=build_out, bsp=bsp)
+ self._tool_path = os.path.join(self.environment['ANDROID_HOST_OUT'],
+ 'bin', path)
class HostToolRunner(object):
- """Serves as a HostToolWrapper factory."""
+ """Serves as a HostToolWrapper factory."""
- def __init__(self, build_out):
- self._build_out = build_out
+ def __init__(self, build_out):
+ self._build_out = build_out
- def run(self, path, args):
- host_tool = HostToolWrapper(path, self._build_out)
- return host_tool.run(args)
+ def run(self, path, args):
+ host_tool = HostToolWrapper(path, self._build_out)
+ return host_tool.run(args)
class PathToolWrapper(ToolWrapper):
- """Wraps a tool expected to be in the user PATH."""
+ """Wraps a tool expected to be in the user PATH."""
- def __init__(self, program, env=None):
- super(PathToolWrapper, self).__init__(program, env)
- self.add_caller_env_path()
+ def __init__(self, program, env=None):
+ super(PathToolWrapper, self).__init__(program, env)
+ self.add_caller_env_path()
class PathToolRunner(object):
- """Serves as a PathToolWrapper factory."""
+ """Serves as a PathToolWrapper factory."""
- def run(self, path, args):
- path_tool = PathToolWrapper(path)
- return path_tool.run(args)
+ def run(self, path, args):
+ path_tool = PathToolWrapper(path)
+ return path_tool.run(args)
class ProvisionDeviceTool(ToolWrapper):
- """Wraps the provision-device script.
-
- provision-device is unique since it's built as part of the product
- output rather than the host output, and also requires a pointer to
- the source tree which other tools don't, so it's useful to create a
- specific subclass for it.
-
- Note: provision-device allows two special environment variables
- ANDROID_PROVISION_VENDOR_PARTITIONS and ANDROID_PROVISION_OS_PARTITIONS
- that replace ANDROID_BUILD_TOP and ANDROID_PRODUCT_OUT if present.
- These are for advanced usage and could easily create confusion, so
- are intentionally not passed through here; if a user requires these
- they will have to call provision-device manually.
- """
-
- def __init__(self, source_top, build_out, bsp, env=None):
- """Initializes a ProvisionDeviceTool.
-
- Args:
- source_top: root of the source tree with the vendor BSP binaries.
- build_out: root of the build output folder where the tool lives.
- bsp: the BSP name.
- env: a dictionary of additional environmental variables to set.
+ """Wraps the provision-device script.
+
+ provision-device is unique since it's built as part of the product
+ output rather than the host output, and also requires a pointer to
+ the source tree which other tools don't, so it's useful to create a
+ specific subclass for it.
+
+ Note: provision-device allows two special environment variables
+ ANDROID_PROVISION_VENDOR_PARTITIONS and ANDROID_PROVISION_OS_PARTITIONS
+ that replace ANDROID_BUILD_TOP and ANDROID_PRODUCT_OUT if present.
+ These are for advanced usage and could easily create confusion, so
+ are intentionally not passed through here; if a user requires these
+ they will have to call provision-device manually.
"""
- # Initialize path to '' at first so we can use ANDROID_PRODUCT_OUT.
- super(ProvisionDeviceTool, self).__init__('', env=env)
- self.set_android_environment(source_top=source_top, build_out=build_out,
- bsp=bsp)
- self._tool_path = os.path.join(self.environment['ANDROID_PRODUCT_OUT'],
- 'provision-device')
- # provision-device is a shell script, so it needs to know PATH in order
- # to find utilities.
- self.add_caller_env_path()
+ def __init__(self, source_top, build_out, bsp, env=None):
+ """Initializes a ProvisionDeviceTool.
+
+ Args:
+ source_top: root of the source tree with the vendor BSP binaries.
+ build_out: root of the build output folder where the tool lives.
+ bsp: the BSP name.
+ env: a dictionary of additional environmental variables to set.
+ """
+ # Initialize path to '' at first so we can use ANDROID_PRODUCT_OUT.
+ super(ProvisionDeviceTool, self).__init__('', env=env)
+ self.set_android_environment(source_top=source_top, build_out=build_out,
+ bsp=bsp)
+ self._tool_path = os.path.join(self.environment['ANDROID_PRODUCT_OUT'],
+ 'provision-device')
+
+ # provision-device is a shell script, so it needs to know PATH in order
+ # to find utilities.
+ self.add_caller_env_path()
class BrunchToolWrapper(ToolWrapper):
- """Legacy tool wrapper used by Brunch.
-
- This adds some additional functionality to the base ToolWrapper:
- * Adds additional Brunch-specific environment variables.
- * Sets PATH, ANDROID_PRODUCT_OUT, and ANDROID_BUILD_TOP using BDK config
- settings instead of passed in variables.
- """
-
- _UNTOUCHABLE_ENV = ['BDK_PATH', 'PATH', 'ANDROID_PRODUCT_OUT',
- 'ANDROID_BUILD_TOP']
-
- def __init__(self, config, product_path, path):
- super(BrunchToolWrapper, self).__init__(path)
-
- self._product_path = product_path
- self._config = config
- self._env_path = ''
- if os.environ.has_key('PATH'):
- self._env_path = os.environ['PATH']
-
- self._import_environ()
- self.environment.update({
- 'BDK_PATH': util.DEPRECATED_GetDefaultOSPath(),
- 'PATH': os.pathsep.join([p for p
- in [util.GetBDKPath('cli'), self._env_path]
- if p]),
- 'ANDROID_PRODUCT_OUT': os.path.join(self._product_path, 'out',
- 'out-' + self._config.device,
- 'target', 'product',
- self._config.device),
- 'ANDROID_BUILD_TOP': os.path.join(self._product_path, 'out', '.bdk')
- })
-
- def _import_environ(self):
- """Walk the global environment merging in allowed variables."""
- extra_vars = self._config.bdk.allowed_environ
- if extra_vars:
- for var in extra_vars.split(' '):
- if var in self._UNTOUCHABLE_ENV:
- print ("Cannot passthrough environment variable '{0}' "
- "as set in config/bdk/allowed_environ").format(var)
- if os.environ.has_key(var):
- self.environment[var] = os.environ[var]
+ """Legacy tool wrapper used by Brunch.
+
+ This adds some additional functionality to the base ToolWrapper:
+ * Adds additional Brunch-specific environment variables.
+ * Sets PATH, ANDROID_PRODUCT_OUT, and ANDROID_BUILD_TOP using BDK config
+ settings instead of passed in variables.
+ """
+
+ _UNTOUCHABLE_ENV = ['BDK_PATH', 'PATH', 'ANDROID_PRODUCT_OUT',
+ 'ANDROID_BUILD_TOP']
+
+ def __init__(self, config, product_path, path):
+ super(BrunchToolWrapper, self).__init__(path)
+
+ self._product_path = product_path
+ self._config = config
+ self._env_path = ''
+ if os.environ.has_key('PATH'):
+ self._env_path = os.environ['PATH']
+
+ self._import_environ()
+ self.environment.update({
+ 'BDK_PATH': util.DEPRECATED_GetDefaultOSPath(),
+ 'PATH': os.pathsep.join([p for p
+ in [util.GetBDKPath('cli'), self._env_path]
+ if p]),
+ 'ANDROID_PRODUCT_OUT': os.path.join(self._product_path, 'out',
+ 'out-' + self._config.device,
+ 'target', 'product',
+ self._config.device),
+ 'ANDROID_BUILD_TOP': os.path.join(self._product_path, 'out', '.bdk')
+ })
+
+ def _import_environ(self):
+ """Walk the global environment merging in allowed variables."""
+ extra_vars = self._config.bdk.allowed_environ
+ if extra_vars:
+ for var in extra_vars.split(' '):
+ if var in self._UNTOUCHABLE_ENV:
+ print ("Cannot passthrough environment variable '{0}' "
+ "as set in config/bdk/allowed_environ").format(var)
+ if os.environ.has_key(var):
+ self.environment[var] = os.environ[var]
class BrunchHostToolWrapper(BrunchToolWrapper):
- """Wraps a host tool for brunch workflows."""
+ """Wraps a host tool for brunch workflows."""
- TOOL_PATH_FMT = os.path.join('{0}', 'out', 'out-{1}',
- 'host', '{2}', 'bin', '{3}')
- DEFAULT_ARCH = 'linux-x86'
+ TOOL_PATH_FMT = os.path.join('{0}', 'out', 'out-{1}',
+ 'host', '{2}', 'bin', '{3}')
+ DEFAULT_ARCH = 'linux-x86'
- def __init__(self, config, product_path, name, arch=DEFAULT_ARCH):
- self._tool_name = name
- self._host_arch = arch
- super(BrunchHostToolWrapper, self).__init__(config, product_path, name)
- self._tool_path = self._build_path()
+ def __init__(self, config, product_path, name, arch=DEFAULT_ARCH):
+ self._tool_name = name
+ self._host_arch = arch
+ super(BrunchHostToolWrapper, self).__init__(config, product_path, name)
+ self._tool_path = self._build_path()
- def _build_path(self):
- return self.TOOL_PATH_FMT.format(self._product_path, self._config.device,
- self._host_arch, self._tool_name)
+ def _build_path(self):
+ return self.TOOL_PATH_FMT.format(self._product_path,
+ self._config.device, self._host_arch,
+ self._tool_name)
class BrunchTargetToolWrapper(BrunchHostToolWrapper):
- """Wraps a target tool for brunch workflows."""
+ """Wraps a target tool for brunch workflows."""
- TOOL_PATH_FMT = os.path.join('{0}', 'out', 'out-{1}',
- 'target', 'product', '{2}', '{3}')
+ TOOL_PATH_FMT = os.path.join('{0}', 'out', 'out-{1}',
+ 'target', 'product', '{2}', '{3}')
- def _build_path(self):
- return self.TOOL_PATH_FMT.format(self._product_path, self._config.device,
- self._config.device, self._tool_name)
+ def _build_path(self):
+ return self.TOOL_PATH_FMT.format(self._product_path,
+ self._config.device,
+ self._config.device, self._tool_name)
diff --git a/cli/lib/core/tool_unittest.py b/cli/lib/core/tool_unittest.py
index 2400cf5..4bca8f1 100644
--- a/cli/lib/core/tool_unittest.py
+++ b/cli/lib/core/tool_unittest.py
@@ -26,310 +26,307 @@ from test import stubs
class ToolWrapperTest(unittest.TestCase):
- """Tests for the ToolWrapper classes."""
-
- def setUp(self):
- self.stub_os = stubs.StubOs()
- self.stub_subprocess = stubs.StubSubprocess()
- self.stub_util = util_stub.StubUtil()
-
- tool.os = self.stub_os
- tool.util = self.stub_util
- tool.subprocess = self.stub_subprocess
-
- def test_run(self):
- """Tests a basic tool run."""
- t = tool.ToolWrapper('/my/tool')
- command = self.stub_subprocess.AddCommand(['/my/tool'])
-
- t.run()
- command.AssertCallContained(['/my/tool'])
-
- def test_run_piped(self):
- """Tests a basic piped tool run."""
- t = tool.ToolWrapper('/my/tool')
- command = self.stub_subprocess.AddCommand(['/my/tool'], ret_out='out',
- ret_err='err')
- command2 = self.stub_subprocess.AddCommand(['/my/tool'], ret_out='out',
- ret_err='err')
- self.assertEqual(t.run(), (None, None))
- self.assertEqual(t.run(piped=True), ('out', 'err'))
-
-
- def test_cwd(self):
- """Tests changing the tool working directory."""
- t = tool.ToolWrapper('/my/tool')
- t.set_cwd('/here')
- command = self.stub_subprocess.AddCommand(['/my/tool'])
-
- t.run()
- command.AssertCallContained(cwd='/here', env={'PWD': '/here'})
-
- def test_fail(self):
- """Tests a failed execution call."""
- def fail_subprocess_call():
- error = OSError()
- error.strerror = 'blah'
- raise error
-
- t = tool.ToolWrapper('/does/not/exist')
- self.stub_subprocess.AddCommand(side_effect=fail_subprocess_call)
-
- with self.assertRaises(tool.ExecuteError) as error:
- t.run()
- self.assertIn('/does/not/exist', str(error.exception))
- self.assertIn('blah', str(error.exception))
-
- def test_custom_env(self):
- """Tests passing custom environment variables."""
- t = tool.ToolWrapper('/my/tool', env={'foo': '1'})
- t.environment['bar'] = '2'
- command = self.stub_subprocess.AddCommand(['/my/tool'])
-
- t.run()
- command.AssertCallContained(env={'foo': '1', 'bar': '2'})
-
- def test_inherited_env(self):
- """Tests inheriting environment variables."""
- self.stub_os.environ['USER'] = 'me'
- self.stub_os.environ['PATH'] = '/should/not/inherit'
- t = tool.ToolWrapper('/my/tool')
- command = self.stub_subprocess.AddCommand(['/my/tool'])
-
- t.run()
- command.AssertCallContained(env={'USER': 'me'})
-
- def test_add_caller_env_path(self):
- """Tests adding the caller's environment PATH manually."""
- # No tool or caller PATH.
- t = tool.ToolWrapper('/my/tool')
- t.add_caller_env_path()
- self.assertNotIn('PATH', t.environment)
-
- # Tool PATH only.
- t = tool.ToolWrapper('/my/tool', env={'PATH': '/foo'})
- t.add_caller_env_path()
- self.assertEqual('/foo', t.environment['PATH'])
-
- # Caller PATH only.
- self.stub_os.environ['PATH'] = '/bar'
- t = tool.ToolWrapper('/my/tool')
- t.add_caller_env_path()
- self.assertEqual('/bar', t.environment['PATH'])
-
- # Both PATHs set.
- self.stub_os.environ['PATH'] = '/bar'
- t = tool.ToolWrapper('/my/tool', env={'PATH': '/foo'})
- t.add_caller_env_path()
- self.assertEqual('/foo:/bar', t.environment['PATH'])
-
- def test_source_top(self):
- """Tests setting source_top."""
- t = tool.ToolWrapper('/my/tool')
- t.set_android_environment(source_top='/foo')
- command = self.stub_subprocess.AddCommand(['/my/tool'])
-
- t.run()
- command.AssertCallContained(env={'ANDROID_BUILD_TOP': '/foo'})
-
- def test_build_out_and_bsp(self):
- """Tests passing build_top and bsp to the tool."""
- self.stub_util.arch = 'test_arch'
- t = tool.ToolWrapper('/my/tool')
- t.set_android_environment(build_out='/build/out', bsp='bar')
- command = self.stub_subprocess.AddCommand(['/my/tool'])
-
- t.run()
- command.AssertCallContained(env={
- 'ANDROID_HOST_OUT': '/build/out/host/test_arch',
- 'ANDROID_PRODUCT_OUT': '/build/out/target/product/bar'
- })
-
- def test_host_tool_wrapper(self):
- """Tests HostToolWrapper construction."""
- self.stub_util.arch = 'test_arch'
- t = tool.HostToolWrapper('tool', '/build/out')
- self.stub_subprocess.AddCommand(['/build/out/host/test_arch/bin/tool'])
-
- t.run()
-
- def test_path_tool_wrapper(self):
- self.stub_os.environ = {'foo': 'bar', 'baz': 'bip', 'PATH': '/usr/bin'}
- t = tool.PathToolWrapper('tool')
- command = self.stub_subprocess.AddCommand(['tool'])
- t.run()
- command.AssertCallContained(['tool'], env={'PATH': '/usr/bin'})
-
- def test_provision_device_tool(self):
- """Tests ProvisionDeviceTool construction."""
- self.stub_util.arch = 'test_arch'
- t = tool.ProvisionDeviceTool(source_top='/source', build_out='/build/out',
- bsp='board_name')
- command = self.stub_subprocess.AddCommand()
-
- t.run()
- command.AssertCallContained(
- ['/build/out/target/product/board_name/provision-device'],
- env={'ANDROID_BUILD_TOP': '/source',
- 'ANDROID_HOST_OUT': '/build/out/host/test_arch',
- 'ANDROID_PRODUCT_OUT': '/build/out/target/product/board_name'})
+ """Tests for the ToolWrapper classes."""
+
+ def setUp(self):
+ self.stub_os = stubs.StubOs()
+ self.stub_subprocess = stubs.StubSubprocess()
+ self.stub_util = util_stub.StubUtil()
+
+ tool.os = self.stub_os
+ tool.util = self.stub_util
+ tool.subprocess = self.stub_subprocess
+
+ def test_run(self):
+ """Tests a basic tool run."""
+ t = tool.ToolWrapper('/my/tool')
+ command = self.stub_subprocess.AddCommand(['/my/tool'])
+
+ t.run()
+ command.AssertCallContained(['/my/tool'])
+
+ def test_run_piped(self):
+ """Tests a basic piped tool run."""
+ t = tool.ToolWrapper('/my/tool')
+ self.stub_subprocess.AddCommand(['/my/tool'], ret_out='out',
+ ret_err='err')
+ self.stub_subprocess.AddCommand(['/my/tool'], ret_out='out',
+ ret_err='err')
+ self.assertEqual(t.run(), (None, None))
+ self.assertEqual(t.run(piped=True), ('out', 'err'))
+
+
+ def test_cwd(self):
+ """Tests changing the tool working directory."""
+ t = tool.ToolWrapper('/my/tool')
+ t.set_cwd('/here')
+ command = self.stub_subprocess.AddCommand(['/my/tool'])
+
+ t.run()
+ command.AssertCallContained(cwd='/here', env={'PWD': '/here'})
+
+ def test_fail(self):
+ """Tests a failed execution call."""
+ def fail_subprocess_call():
+ error = OSError()
+ error.strerror = 'blah'
+ raise error
+
+ t = tool.ToolWrapper('/does/not/exist')
+ self.stub_subprocess.AddCommand(side_effect=fail_subprocess_call)
+
+ with self.assertRaises(tool.ExecuteError) as error:
+ t.run()
+ self.assertIn('/does/not/exist', str(error.exception))
+ self.assertIn('blah', str(error.exception))
+
+ def test_custom_env(self):
+ """Tests passing custom environment variables."""
+ t = tool.ToolWrapper('/my/tool', env={'foo': '1'})
+ t.environment['bar'] = '2'
+ command = self.stub_subprocess.AddCommand(['/my/tool'])
+
+ t.run()
+ command.AssertCallContained(env={'foo': '1', 'bar': '2'})
+
+ def test_inherited_env(self):
+ """Tests inheriting environment variables."""
+ self.stub_os.environ['USER'] = 'me'
+ self.stub_os.environ['PATH'] = '/should/not/inherit'
+ t = tool.ToolWrapper('/my/tool')
+ command = self.stub_subprocess.AddCommand(['/my/tool'])
+
+ t.run()
+ command.AssertCallContained(env={'USER': 'me'})
+
+ def test_add_caller_env_path(self):
+ """Tests adding the caller's environment PATH manually."""
+ # No tool or caller PATH.
+ t = tool.ToolWrapper('/my/tool')
+ t.add_caller_env_path()
+ self.assertNotIn('PATH', t.environment)
+
+ # Tool PATH only.
+ t = tool.ToolWrapper('/my/tool', env={'PATH': '/foo'})
+ t.add_caller_env_path()
+ self.assertEqual('/foo', t.environment['PATH'])
+
+ # Caller PATH only.
+ self.stub_os.environ['PATH'] = '/bar'
+ t = tool.ToolWrapper('/my/tool')
+ t.add_caller_env_path()
+ self.assertEqual('/bar', t.environment['PATH'])
+
+ # Both PATHs set.
+ self.stub_os.environ['PATH'] = '/bar'
+ t = tool.ToolWrapper('/my/tool', env={'PATH': '/foo'})
+ t.add_caller_env_path()
+ self.assertEqual('/foo:/bar', t.environment['PATH'])
+
+ def test_source_top(self):
+ """Tests setting source_top."""
+ t = tool.ToolWrapper('/my/tool')
+ t.set_android_environment(source_top='/foo')
+ command = self.stub_subprocess.AddCommand(['/my/tool'])
+
+ t.run()
+ command.AssertCallContained(env={'ANDROID_BUILD_TOP': '/foo'})
+
+ def test_build_out_and_bsp(self):
+ """Tests passing build_top and bsp to the tool."""
+ self.stub_util.arch = 'test_arch'
+ t = tool.ToolWrapper('/my/tool')
+ t.set_android_environment(build_out='/build/out', bsp='bar')
+ command = self.stub_subprocess.AddCommand(['/my/tool'])
+
+ t.run()
+ command.AssertCallContained(env={
+ 'ANDROID_HOST_OUT': '/build/out/host/test_arch',
+ 'ANDROID_PRODUCT_OUT': '/build/out/target/product/bar'
+ })
+
+ def test_host_tool_wrapper(self):
+ """Tests HostToolWrapper construction."""
+ self.stub_util.arch = 'test_arch'
+ t = tool.HostToolWrapper('tool', '/build/out')
+ self.stub_subprocess.AddCommand(['/build/out/host/test_arch/bin/tool'])
+
+ t.run()
+
+ def test_path_tool_wrapper(self):
+ self.stub_os.environ = {'foo': 'bar', 'baz': 'bip', 'PATH': '/usr/bin'}
+ t = tool.PathToolWrapper('tool')
+ command = self.stub_subprocess.AddCommand(['tool'])
+ t.run()
+ command.AssertCallContained(['tool'], env={'PATH': '/usr/bin'})
+
+ def test_provision_device_tool(self):
+ """Tests ProvisionDeviceTool construction."""
+ self.stub_util.arch = 'test_arch'
+ t = tool.ProvisionDeviceTool(source_top='/source',
+ build_out='/build/out', bsp='board_name')
+ command = self.stub_subprocess.AddCommand()
+
+ t.run()
+ command.AssertCallContained(
+ ['/build/out/target/product/board_name/provision-device'],
+ env={'ANDROID_BUILD_TOP': '/source',
+ 'ANDROID_HOST_OUT': '/build/out/host/test_arch',
+ 'ANDROID_PRODUCT_OUT': '/build/out/target/product/board_name'})
class BrunchToolWrapperTest(unittest.TestCase):
- def setUp(self):
- self.stub_subprocess = stubs.StubSubprocess()
- self.stub_util = util_stub.StubUtil()
+ def setUp(self):
+ self.stub_subprocess = stubs.StubSubprocess()
+ self.stub_util = util_stub.StubUtil()
- tool.util = self.stub_util
- tool.os = stubs.StubOs()
- tool.subprocess = self.stub_subprocess
+ tool.util = self.stub_util
+ tool.os = stubs.StubOs()
+ tool.subprocess = self.stub_subprocess
- self.saved_props = (config.DictStore.REQUIRED_PROPS,
- config.DictStore.OPTIONAL_PROPS)
- config.DictStore.REQUIRED_PROPS = config.ProductFileStore.REQUIRED_PROPS
- config.DictStore.OPTIONAL_PROPS = config.ProductFileStore.OPTIONAL_PROPS
- self.store = config.DictStore()
- self.store.device = 'somedevice'
+ self.saved_props = (config.DictStore.REQUIRED_PROPS,
+ config.DictStore.OPTIONAL_PROPS)
+ config.DictStore.REQUIRED_PROPS = config.ProductFileStore.REQUIRED_PROPS
+ config.DictStore.OPTIONAL_PROPS = config.ProductFileStore.OPTIONAL_PROPS
+ self.store = config.DictStore()
+ self.store.device = 'somedevice'
- self.product_path = 'product_path'
+ self.product_path = 'product_path'
- def tearDown(self):
- config.DictStore.REQUIRED_PROPS = self.saved_props[0]
- config.DictStore.OPTIONAL_PROPS = self.saved_props[1]
+ def tearDown(self):
+ config.DictStore.REQUIRED_PROPS = self.saved_props[0]
+ config.DictStore.OPTIONAL_PROPS = self.saved_props[1]
class TestBrunchToolRun(BrunchToolWrapperTest):
- def test_base(self):
- t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool')
- t.environment.clear()
- t.environment['PATH'] = 'abc:123'
- t.environment['BAZ'] = 'B "A" R'
+ def test_base(self):
+ t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool')
+ t.environment.clear()
+ t.environment['PATH'] = 'abc:123'
+ t.environment['BAZ'] = 'B "A" R'
- command = self.stub_subprocess.AddCommand(ret_code=0)
- t.run(['arg1'])
- command.AssertCallWas(['/my/tool', 'arg1'], shell=False, cwd=None,
- stdout=None,
- stderr=None,
- env={'PATH': 'abc:123', 'BAZ': 'B "A" R'})
+ command = self.stub_subprocess.AddCommand(ret_code=0)
+ t.run(['arg1'])
+ command.AssertCallWas(['/my/tool', 'arg1'], shell=False, cwd=None,
+ stdout=None, stderr=None,
+ env={'PATH': 'abc:123', 'BAZ': 'B "A" R'})
- command = self.stub_subprocess.AddCommand(ret_code=1)
- with self.assertRaises(tool.ReturnError) as e:
- t.run(['arg1'])
- self.assertEqual(1, e.exception.errno)
- command.AssertCallWas(['/my/tool', 'arg1'], shell=False, cwd=None,
- stdout=None,
- stderr=None,
- env={'PATH': 'abc:123', 'BAZ': 'B "A" R'})
+ command = self.stub_subprocess.AddCommand(ret_code=1)
+ with self.assertRaises(tool.ReturnError) as e:
+ t.run(['arg1'])
+ self.assertEqual(1, e.exception.errno)
+ command.AssertCallWas(['/my/tool', 'arg1'], shell=False, cwd=None,
+ stdout=None, stderr=None,
+ env={'PATH': 'abc:123', 'BAZ': 'B "A" R'})
- def test_sigabrt(self):
- t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool')
- self.stub_subprocess.AddCommand(['/my/tool', 'arg1'], ret_code=-6)
+ def test_sigabrt(self):
+ t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool')
+ self.stub_subprocess.AddCommand(['/my/tool', 'arg1'], ret_code=-6)
- with self.assertRaises(tool.ReturnError) as e:
- t.run(['arg1'])
- self.assertEqual(128 + 6, e.exception.errno)
+ with self.assertRaises(tool.ReturnError) as e:
+ t.run(['arg1'])
+ self.assertEqual(128 + 6, e.exception.errno)
- def test_cwd(self):
- t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool')
- t.set_cwd('/here')
- command = self.stub_subprocess.AddCommand(['/my/tool'])
+ def test_cwd(self):
+ t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool')
+ t.set_cwd('/here')
+ command = self.stub_subprocess.AddCommand(['/my/tool'])
- t.run(['arg1'])
- command.AssertCallContained(cwd='/here')
+ t.run(['arg1'])
+ command.AssertCallContained(cwd='/here')
class TestBrunchToolEnvironment(BrunchToolWrapperTest):
- def test_baseline(self):
- t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool')
- t.environment.clear()
- t.environment['PATH'] = 'abc:123'
- t.environment['BAZ'] = 'B "A" R'
- command = self.stub_subprocess.AddCommand()
-
- t.run(['arg1'])
- command.AssertCallWas(['/my/tool', 'arg1'], shell=False, cwd=None,
- stdout=None,
- stderr=None,
- env={'PATH': 'abc:123', 'BAZ': 'B "A" R'})
-
- def test_no_path(self):
- """Ensures no exceptions if there is no PATH."""
- t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool')
- command = self.stub_subprocess.AddCommand(['/my/tool', 'arg1'])
-
- t.run(['arg1'])
- self.assertEqual(self.stub_util.GetBDKPath('cli'),
- command.GetCallArgs()[1]['env']['PATH'])
-
- def test_untouchable(self):
- """Tests config/bdk/allowed_environ limits."""
- self.store.bdk.allowed_environ = 'BDK_PATH'
- tool.os.environ['BDK_PATH'] = '/path/to/crazy/town'
-
- t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool')
- command = self.stub_subprocess.AddCommand(['/my/tool', 'arg1'])
-
- t.run(['arg1'])
- self.assertEqual(self.stub_util.DEPRECATED_GetDefaultOSPath(),
- command.GetCallArgs()[1]['env']['BDK_PATH'])
-
- def test_passthrough(self):
- """Ensures config/bdk/allowed_environ is respected."""
- self.store.bdk.allowed_environ = 'FUZZY WUZZY'
- tool.os.environ['FUZZY'] = 'WUZ'
- tool.os.environ['WUZZY'] = 'A BEAR'
-
- t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool')
- command = self.stub_subprocess.AddCommand(['/my/tool', 'arg1'])
-
- t.run(['arg1'])
- call_env = command.GetCallArgs()[1]['env']
- self.assertEqual(call_env['FUZZY'], 'WUZ')
- self.assertEqual(call_env['WUZZY'], 'A BEAR')
- self.assertEqual(call_env['BDK_PATH'],
- self.stub_util.DEPRECATED_GetDefaultOSPath())
-
- def test_product_out(self):
- """Ensures ANDROID_PRODUCT_OUT meets the contract from adb."""
- t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool')
- command = self.stub_subprocess.AddCommand(['/my/tool', 'arg1'])
-
- t.run(['arg1'])
- self.assertEqual(
- 'product_path/out/out-somedevice/target/product/somedevice',
- command.GetCallArgs()[1]['env']['ANDROID_PRODUCT_OUT'])
+ def test_baseline(self):
+ t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool')
+ t.environment.clear()
+ t.environment['PATH'] = 'abc:123'
+ t.environment['BAZ'] = 'B "A" R'
+ command = self.stub_subprocess.AddCommand()
+
+ t.run(['arg1'])
+ command.AssertCallWas(['/my/tool', 'arg1'], shell=False, cwd=None,
+ stdout=None, stderr=None,
+ env={'PATH': 'abc:123', 'BAZ': 'B "A" R'})
+
+ def test_no_path(self):
+ """Ensures no exceptions if there is no PATH."""
+ t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool')
+ command = self.stub_subprocess.AddCommand(['/my/tool', 'arg1'])
+
+ t.run(['arg1'])
+ self.assertEqual(self.stub_util.GetBDKPath('cli'),
+ command.GetCallArgs()[1]['env']['PATH'])
+
+ def test_untouchable(self):
+ """Tests config/bdk/allowed_environ limits."""
+ self.store.bdk.allowed_environ = 'BDK_PATH'
+ tool.os.environ['BDK_PATH'] = '/path/to/crazy/town'
+
+ t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool')
+ command = self.stub_subprocess.AddCommand(['/my/tool', 'arg1'])
+
+ t.run(['arg1'])
+ self.assertEqual(self.stub_util.DEPRECATED_GetDefaultOSPath(),
+ command.GetCallArgs()[1]['env']['BDK_PATH'])
+
+ def test_passthrough(self):
+ """Ensures config/bdk/allowed_environ is respected."""
+ self.store.bdk.allowed_environ = 'FUZZY WUZZY'
+ tool.os.environ['FUZZY'] = 'WUZ'
+ tool.os.environ['WUZZY'] = 'A BEAR'
+
+ t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool')
+ command = self.stub_subprocess.AddCommand(['/my/tool', 'arg1'])
+
+ t.run(['arg1'])
+ call_env = command.GetCallArgs()[1]['env']
+ self.assertEqual(call_env['FUZZY'], 'WUZ')
+ self.assertEqual(call_env['WUZZY'], 'A BEAR')
+ self.assertEqual(call_env['BDK_PATH'],
+ self.stub_util.DEPRECATED_GetDefaultOSPath())
+
+ def test_product_out(self):
+ """Ensures ANDROID_PRODUCT_OUT meets the contract from adb."""
+ t = tool.BrunchToolWrapper(self.store, self.product_path, '/my/tool')
+ command = self.stub_subprocess.AddCommand(['/my/tool', 'arg1'])
+
+ t.run(['arg1'])
+ self.assertEqual(
+ 'product_path/out/out-somedevice/target/product/somedevice',
+ command.GetCallArgs()[1]['env']['ANDROID_PRODUCT_OUT'])
class TestBrunchHostToolWrapper(BrunchToolWrapperTest):
- def test_path(self):
- t = tool.BrunchHostToolWrapper(self.store, self.product_path, 'tool')
- self.assertEqual(
- 'product_path/out/out-somedevice/host/linux-x86/bin/tool',
- t.path())
+ def test_path(self):
+ t = tool.BrunchHostToolWrapper(self.store, self.product_path, 'tool')
+ self.assertEqual(
+ 'product_path/out/out-somedevice/host/linux-x86/bin/tool',
+ t.path())
- t = tool.BrunchHostToolWrapper(self.store, self.product_path, 'fastboot',
- arch='win-mips')
- self.assertEqual(
- 'product_path/out/out-somedevice/host/win-mips/bin/fastboot',
- t.path())
+ t = tool.BrunchHostToolWrapper(self.store, self.product_path,
+ 'fastboot', arch='win-mips')
+ self.assertEqual(
+ 'product_path/out/out-somedevice/host/win-mips/bin/fastboot',
+ t.path())
class TestBrunchTargetToolWrapper(BrunchToolWrapperTest):
- def test_path(self):
- t = tool.BrunchTargetToolWrapper(self.store, self.product_path, 'tool')
- self.assertEqual(
- 'product_path/out/out-somedevice/target/product/somedevice/tool',
- t.path())
-
- t = tool.BrunchTargetToolWrapper(self.store, self.product_path,
- 'provision-dev')
- self.assertEqual(
- ('product_path/out/out-somedevice/target/product/somedevice/'
- 'provision-dev'),
- t.path())
+ def test_path(self):
+ t = tool.BrunchTargetToolWrapper(self.store, self.product_path, 'tool')
+ self.assertEqual(
+ 'product_path/out/out-somedevice/target/product/somedevice/tool',
+ t.path())
+
+ t = tool.BrunchTargetToolWrapper(self.store, self.product_path,
+ 'provision-dev')
+ self.assertEqual(
+ ('product_path/out/out-somedevice/target/product/somedevice/'
+ 'provision-dev'),
+ t.path())
diff --git a/cli/lib/core/user_config.py b/cli/lib/core/user_config.py
index b6a7398..768c63d 100644
--- a/cli/lib/core/user_config.py
+++ b/cli/lib/core/user_config.py
@@ -16,7 +16,6 @@
"""A class storing user configuration."""
-import os
import uuid
from core import config
@@ -34,66 +33,67 @@ PATHS = [BSP_DIR, PLATFORM_CACHE]
class UserConfig(config.Store):
- REQUIRED_PROPS = {METRICS_OPT_IN:['0', '1'], UID:[]}
- OPTIONAL_PROPS = {BSP_DIR:[], PLATFORM_CACHE:[]}
- PREFIX = 'user_'
-
- def __init__(self, file_path='', table='user'):
- file_path = file_path or util.GetUserDataPath('config.db')
- super(UserConfig, self).__init__(file_path, table)
-
- self._defaults = {
- BSP_DIR: util.GetBDKPath('BSPs'),
- PLATFORM_CACHE: util.GetBDKPath('platform_cache')
- }
-
-
- def initialize(self, opt_in=None):
- """Sets up the user store.
-
- Prompts the user for information when necessary.
-
- Args:
- opt_in: (optional) If provided, skip prompting and set
- metrics_opt_in to the provided value. Fixes True to '1'
- and False to '0'.
- """
- # Fix booleans to strings.
- if opt_in == False:
- opt_in = '0'
- elif opt_in == True:
- opt_in = '1'
-
- # Check for opt in
- print ('To help improve the quality of this product, we collect\n'
- 'anonymized data on how the BDK is used. You may choose to opt out\n'
- 'of this collection now (by choosing "N" at the below prompt),\n'
- 'or at any point in the future by running the following command:\n'
- ' bdk config set metrics_opt_in 0')
- while opt_in is None:
- choice = util.GetUserInput( 'Do you want to help improve the '
- 'Project Brillo BDK?\n'
- '(Y/n) ').strip().upper()
- if not choice or choice == 'Y':
- opt_in = '1'
- elif choice == 'N':
- opt_in = '0'
-
- # pylint: disable=attribute-defined-outside-init
- self.metrics_opt_in = opt_in
-
- self.uid = str(uuid.uuid4())
-
- def _save(self, key, val):
- if not val and key in self._defaults:
- val = self._defaults[key]
- return super(UserConfig, self)._save(key, val)
-
- def _load(self, key):
- result = super(UserConfig, self)._load(key)
- if not result and key in self._defaults:
- result = self._defaults[key]
- return result
+ REQUIRED_PROPS = {METRICS_OPT_IN:['0', '1'], UID:[]}
+ OPTIONAL_PROPS = {BSP_DIR:[], PLATFORM_CACHE:[]}
+ PREFIX = 'user_'
+
+ def __init__(self, file_path='', table='user'):
+ file_path = file_path or util.GetUserDataPath('config.db')
+ super(UserConfig, self).__init__(file_path, table)
+
+ self._defaults = {
+ BSP_DIR: util.GetBDKPath('BSPs'),
+ PLATFORM_CACHE: util.GetBDKPath('platform_cache')
+ }
+
+
+ def initialize(self, opt_in=None):
+ """Sets up the user store.
+
+ Prompts the user for information when necessary.
+
+ Args:
+ opt_in: (optional) If provided, skip prompting and set
+ metrics_opt_in to the provided value. Fixes True to '1'
+ and False to '0'.
+ """
+ # Fix booleans to strings.
+ if opt_in == False:
+ opt_in = '0'
+ elif opt_in == True:
+ opt_in = '1'
+
+ # Check for opt in
+ print ('To help improve the quality of this product, we collect\n'
+ 'anonymized data on how the BDK is used. You may choose to opt\n'
+ 'out of this collection now (by choosing "N" at the below\n'
+ 'prompt), or at any point in the future by running the\n'
+ 'following command:\n'
+ ' bdk config set metrics_opt_in 0')
+ while opt_in is None:
+ choice = util.GetUserInput('Do you want to help improve the '
+ 'Project Brillo BDK?\n'
+ '(Y/n) ').strip().upper()
+ if not choice or choice == 'Y':
+ opt_in = '1'
+ elif choice == 'N':
+ opt_in = '0'
+
+ # pylint: disable=attribute-defined-outside-init
+ self.metrics_opt_in = opt_in
+
+ self.uid = str(uuid.uuid4())
+
+ def _save(self, key, val):
+ if not val and key in self._defaults:
+ val = self._defaults[key]
+ return super(UserConfig, self)._save(key, val)
+
+ def _load(self, key):
+ result = super(UserConfig, self)._load(key)
+ if not result and key in self._defaults:
+ result = self._defaults[key]
+ return result
# A default global UserConfig.
USER_CONFIG = UserConfig()
diff --git a/cli/lib/core/user_config_stub.py b/cli/lib/core/user_config_stub.py
index 8b2483b..47fcace 100644
--- a/cli/lib/core/user_config_stub.py
+++ b/cli/lib/core/user_config_stub.py
@@ -17,26 +17,21 @@
"""core.user_config module stubs."""
-import collections
-import os
-
from core import user_config
-from core import util
-import error
class StubUserConfig(object):
- """Stubs for our core.user_config module."""
+ """Stubs for our core.user_config module."""
- EXPOSED = user_config.EXPOSED
+ EXPOSED = user_config.EXPOSED
- class UserConfig(object):
- def __init__(self, file_path='user_data.db', table='totally_private'):
- self.file_path = file_path
- self.table = table
- self.metrics_opt_in = '0'
- self.uid = 'user_id'
- self.bsp_dir = 'somewhere/bsps'
- self.platform_cache = 'elsewhere/pc'
+ class UserConfig(object):
+ def __init__(self, file_path='user_data.db', table='totally_private'):
+ self.file_path = file_path
+ self.table = table
+ self.metrics_opt_in = '0'
+ self.uid = 'user_id'
+ self.bsp_dir = 'somewhere/bsps'
+ self.platform_cache = 'elsewhere/pc'
- USER_CONFIG = UserConfig()
+ USER_CONFIG = UserConfig()
diff --git a/cli/lib/core/user_config_unittest.py b/cli/lib/core/user_config_unittest.py
index 5b85697..a1fee76 100644
--- a/cli/lib/core/user_config_unittest.py
+++ b/cli/lib/core/user_config_unittest.py
@@ -26,85 +26,86 @@ from test import stubs
class UserConfigTest(unittest.TestCase):
- def setUp(self):
- self.stub_os = stubs.StubOs()
- self.stub_util = util_stub.StubUtil()
- self.stub_uuid = stubs.StubUuid()
-
- user_config.os = self.stub_os
- user_config.util = self.stub_util
- user_config.uuid = self.stub_uuid
-
- self.user_config = user_config.UserConfig(file_path=':memory:')
-
- def test_default_config(self):
- uc = user_config.UserConfig()
- # Default user config should be in user data dir.
- self.assertTrue(uc._path.startswith(self.stub_util.user_data_path))
-
- def test_initialize_opt_in(self):
- self.user_config.initialize(opt_in=True)
- self.assertEqual(self.user_config.metrics_opt_in, '1')
- self.assertEqual(self.user_config.uid, self.stub_uuid.generate)
-
- def test_initialize_opt_out(self):
- self.user_config.initialize(opt_in=False)
- self.assertEqual(self.user_config.metrics_opt_in, '0')
- self.assertEqual(self.user_config.uid, self.stub_uuid.generate)
-
- def test_opt_in_values(self):
- self.user_config.metrics_opt_in = '0'
- self.assertEqual(self.user_config.metrics_opt_in, '0')
- self.user_config.metrics_opt_in = '1'
- self.assertEqual(self.user_config.metrics_opt_in, '1')
- with self.assertRaises(ValueError):
- self.user_config.metrics_opt_in = '2'
- with self.assertRaises(ValueError):
- self.user_config.metrics_opt_in = ''
- with self.assertRaises(ValueError):
- self.user_config.metrics_opt_in = '-1'
- with self.assertRaises(ValueError):
- self.user_config.metrics_opt_in = 'not_a_number'
- with self.assertRaises(ValueError):
- self.user_config.metrics_opt_in = None
- # Must be given in string form.
- with self.assertRaises(ValueError):
- self.user_config.metrics_opt_in = 1
- with self.assertRaises(ValueError):
- self.user_config.metrics_opt_in = 0
-
- def test_defaults(self):
- self.assertIsNone(self.user_config.metrics_opt_in)
- self.assertIsNone(self.user_config.uid)
- self.assertEqual(self.user_config.bsp_dir,
- self.stub_util.GetBDKPath('BSPs'))
- self.assertEqual(self.user_config.platform_cache,
- self.stub_util.GetBDKPath('platform_cache'))
-
- # Set values (except for metrics_opt_in, tested above).
- self.user_config.uid = 'uid!'
- self.assertEqual(self.user_config.uid, 'uid!')
- self.user_config.bsp_dir = 'bsp_dir!'
- self.assertEqual(self.user_config.bsp_dir, 'bsp_dir!')
- self.user_config.platform_cache = 'platform_cache!'
- self.assertEqual(self.user_config.platform_cache, 'platform_cache!')
-
- # Reset values.
- self.user_config.uid = None
- self.assertIsNone(self.user_config.uid)
- self.user_config.bsp_dir = None
- self.assertEqual(self.user_config.bsp_dir,
- self.stub_util.GetBDKPath('BSPs'))
- self.user_config.platform_cache = None
- self.assertEqual(self.user_config.platform_cache,
- self.stub_util.GetBDKPath('platform_cache'))
-
- # Empty string should behave like default for those with defaults.
- self.user_config.uid = ''
- self.assertEqual(self.user_config.uid, '')
- self.user_config.bsp_dir = ''
- self.assertEqual(self.user_config.bsp_dir,
- self.stub_util.GetBDKPath('BSPs'))
- self.user_config.platform_cache = ''
- self.assertEqual(self.user_config.platform_cache,
- self.stub_util.GetBDKPath('platform_cache'))
+ def setUp(self):
+ self.stub_os = stubs.StubOs()
+ self.stub_util = util_stub.StubUtil()
+ self.stub_uuid = stubs.StubUuid()
+
+ user_config.os = self.stub_os
+ user_config.util = self.stub_util
+ user_config.uuid = self.stub_uuid
+
+ self.user_config = user_config.UserConfig(file_path=':memory:')
+
+ def test_default_config(self):
+ uc = user_config.UserConfig()
+ # Default user config should be in user data dir.
+ # pylint: disable=protected-access
+ self.assertTrue(uc._path.startswith(self.stub_util.user_data_path))
+
+ def test_initialize_opt_in(self):
+ self.user_config.initialize(opt_in=True)
+ self.assertEqual(self.user_config.metrics_opt_in, '1')
+ self.assertEqual(self.user_config.uid, self.stub_uuid.generate)
+
+ def test_initialize_opt_out(self):
+ self.user_config.initialize(opt_in=False)
+ self.assertEqual(self.user_config.metrics_opt_in, '0')
+ self.assertEqual(self.user_config.uid, self.stub_uuid.generate)
+
+ def test_opt_in_values(self):
+ self.user_config.metrics_opt_in = '0'
+ self.assertEqual(self.user_config.metrics_opt_in, '0')
+ self.user_config.metrics_opt_in = '1'
+ self.assertEqual(self.user_config.metrics_opt_in, '1')
+ with self.assertRaises(ValueError):
+ self.user_config.metrics_opt_in = '2'
+ with self.assertRaises(ValueError):
+ self.user_config.metrics_opt_in = ''
+ with self.assertRaises(ValueError):
+ self.user_config.metrics_opt_in = '-1'
+ with self.assertRaises(ValueError):
+ self.user_config.metrics_opt_in = 'not_a_number'
+ with self.assertRaises(ValueError):
+ self.user_config.metrics_opt_in = None
+ # Must be given in string form.
+ with self.assertRaises(ValueError):
+ self.user_config.metrics_opt_in = 1
+ with self.assertRaises(ValueError):
+ self.user_config.metrics_opt_in = 0
+
+ def test_defaults(self):
+ self.assertIsNone(self.user_config.metrics_opt_in)
+ self.assertIsNone(self.user_config.uid)
+ self.assertEqual(self.user_config.bsp_dir,
+ self.stub_util.GetBDKPath('BSPs'))
+ self.assertEqual(self.user_config.platform_cache,
+ self.stub_util.GetBDKPath('platform_cache'))
+
+ # Set values (except for metrics_opt_in, tested above).
+ self.user_config.uid = 'uid!'
+ self.assertEqual(self.user_config.uid, 'uid!')
+ self.user_config.bsp_dir = 'bsp_dir!'
+ self.assertEqual(self.user_config.bsp_dir, 'bsp_dir!')
+ self.user_config.platform_cache = 'platform_cache!'
+ self.assertEqual(self.user_config.platform_cache, 'platform_cache!')
+
+ # Reset values.
+ self.user_config.uid = None
+ self.assertIsNone(self.user_config.uid)
+ self.user_config.bsp_dir = None
+ self.assertEqual(self.user_config.bsp_dir,
+ self.stub_util.GetBDKPath('BSPs'))
+ self.user_config.platform_cache = None
+ self.assertEqual(self.user_config.platform_cache,
+ self.stub_util.GetBDKPath('platform_cache'))
+
+ # Empty string should behave like default for those with defaults.
+ self.user_config.uid = ''
+ self.assertEqual(self.user_config.uid, '')
+ self.user_config.bsp_dir = ''
+ self.assertEqual(self.user_config.bsp_dir,
+ self.stub_util.GetBDKPath('BSPs'))
+ self.user_config.platform_cache = ''
+ self.assertEqual(self.user_config.platform_cache,
+ self.stub_util.GetBDKPath('platform_cache'))
diff --git a/cli/lib/core/util.py b/cli/lib/core/util.py
index d66e2da..a9ecd5c 100644
--- a/cli/lib/core/util.py
+++ b/cli/lib/core/util.py
@@ -25,19 +25,19 @@ import error
class Error(error.Error):
- """Base error for util module."""
+ """Base error for util module."""
class HostUnsupportedArchError(Error):
- """Raised when run on an unsupported host architecture."""
+ """Raised when run on an unsupported host architecture."""
class PathError(Error):
- """Raised when an expected path can't be found."""
+ """Raised when an expected path can't be found."""
class OSVersionError(Error):
- """Raised when an invalid OS version is requested."""
+ """Raised when an invalid OS version is requested."""
ANDROID_PRODUCTS_MK = 'AndroidProducts.mk'
@@ -48,136 +48,136 @@ PLATFORM_CACHE_FORMAT = os.path.join('{user_configured_platform_cache}',
def GetBDKPath(*relpath_args):
- """Find a path relative to the base of the BDK."""
- # We know this file is located in <bdk>/cli/lib/core/.
- core_path = os.path.dirname(__file__)
- return os.path.abspath(os.path.join(core_path, '..', '..', '..',
- *relpath_args))
+ """Find a path relative to the base of the BDK."""
+ # We know this file is located in <bdk>/cli/lib/core/.
+ core_path = os.path.dirname(__file__)
+ return os.path.abspath(os.path.join(core_path, '..', '..', '..',
+ *relpath_args))
def GetUserDataPath(*relpath_args):
- """Find a path relative to the user data dir."""
- # User data is stored at <bdk>/user_data.
- return GetBDKPath('user_data', *relpath_args)
+ """Find a path relative to the user data dir."""
+ # User data is stored at <bdk>/user_data.
+ return GetBDKPath('user_data', *relpath_args)
def GetBDKVersion():
- """Find the BDK version"""
- # Cache the version.
- # TODO(wad)(b/25952145) make this less hacky.
- if vars().get('bdk_version') is not None:
- return vars().get('bdk_version')
- version_path = GetBDKPath('VERSION')
- version = 0 # Default version.
- with open(version_path, 'r') as f:
- version = f.readline().strip()
- vars()['bdk_version'] = version
- return version
+ """Find the BDK version"""
+ # Cache the version.
+ # TODO(wad)(b/25952145) make this less hacky.
+ if vars().get('bdk_version') is not None:
+ return vars().get('bdk_version')
+ version_path = GetBDKPath('VERSION')
+ version = 0 # Default version.
+ with open(version_path, 'r') as f:
+ version = f.readline().strip()
+ vars()['bdk_version'] = version
+ return version
def GetOSVersion():
- """Find the OS version"""
- # TODO(b/27653682): Separate versions for OS & BDK.
- # Note: this function will be removed once OSes are downloaded like
- # BSPs. At that point, code should check the manifest for available
- # versions.
- return GetBDKVersion()
+ """Find the OS version"""
+ # TODO(b/27653682): Separate versions for OS & BDK.
+ # Note: this function will be removed once OSes are downloaded like
+ # BSPs. At that point, code should check the manifest for available
+ # versions.
+ return GetBDKVersion()
def GetOSPath(os_version, *relpath_args):
- """Find a path relative to the base of an OS.
+ """Find a path relative to the base of an OS.
- Raises:
- OSVersionError: The designated OS is not installed.
- """
- if os_version != GetOSVersion():
- raise OSVersionError('Brillo {} is not installed '
- '(installed version is: {}, the BDK does not yet '
- 'support installing additional versions).'.format(
- os_version, GetOSVersion()))
- # For now, the BDK is actually under the OS at <os>/tools/bdk,
- # so we can figure out where the OS is based on that.
- return GetBDKPath('..', '..', *relpath_args)
+ Raises:
+ OSVersionError: The designated OS is not installed.
+ """
+ if os_version != GetOSVersion():
+ raise OSVersionError('Brillo {} is not installed '
+ '(installed version is: {}, the BDK does not yet '
+ 'support installing additional versions).'.format(
+ os_version, GetOSVersion()))
+ # For now, the BDK is actually under the OS at <os>/tools/bdk,
+ # so we can figure out where the OS is based on that.
+ return GetBDKPath('..', '..', *relpath_args)
def DEPRECATED_GetDefaultOSPath(*relpath):
- """DEPRECATED - use GetOSPath.
+ """DEPRECATED - use GetOSPath.
- For use by legacy brunch commands without a target os version.
- """
- return GetOSPath(GetOSVersion(), *relpath)
+ For use by legacy brunch commands without a target os version.
+ """
+ return GetOSPath(GetOSVersion(), *relpath)
def GetProductDir():
- """Walks from cwd upward to find the product root"""
- p = os.getcwd()
- while p != '/':
- if os.path.isfile(os.path.join(p, ANDROID_PRODUCTS_MK)):
- return p
- p = os.path.dirname(p)
- return None
+ """Walks from cwd upward to find the product root"""
+ p = os.getcwd()
+ while p != '/':
+ if os.path.isfile(os.path.join(p, ANDROID_PRODUCTS_MK)):
+ return p
+ p = os.path.dirname(p)
+ return None
def GetAndroidProductOut(platform_build_out, board):
- """Returns the ANDROID_PRODUCT_OUT directory.
+ """Returns the ANDROID_PRODUCT_OUT directory.
- Args:
- platform_build_out: root directory of the platform build output.
- board: board name.
+ Args:
+ platform_build_out: root directory of the platform build output.
+ board: board name.
- Returns:
- Path to the product build output directory.
- """
- return os.path.join(platform_build_out, 'target', 'product', board)
+ Returns:
+ Path to the product build output directory.
+ """
+ return os.path.join(platform_build_out, 'target', 'product', board)
def FindProjectSpec():
- """Walks from cwd upward to find the project spec."""
- p = os.getcwd()
- while p != '/':
- f = os.path.join(p, PROJECT_SPEC_FILENAME)
- if os.path.isfile(f):
- return f
- p = os.path.dirname(p)
- return None
+ """Walks from cwd upward to find the project spec."""
+ p = os.getcwd()
+ while p != '/':
+ f = os.path.join(p, PROJECT_SPEC_FILENAME)
+ if os.path.isfile(f):
+ return f
+ p = os.path.dirname(p)
+ return None
def AsShellArgs(args):
- # TODO(wad)(b/25952900) we should sanitize beyond just quotes.
- return ' '.join(['"%s"' % a.replace('"', '\\"') for a in args])
+ # TODO(wad)(b/25952900) we should sanitize beyond just quotes.
+ return ' '.join(['"%s"' % a.replace('"', '\\"') for a in args])
def GetExitCode(status):
- """Convert an os.wait status code to a shell return value"""
- if os.WIFEXITED(status):
- return os.WEXITSTATUS(status)
- if os.WIFSTOPPED(status):
- return 128+os.WSTOPSIG(status)
- if os.WIFSIGNALED(status):
- return 128+os.WTERMSIG(status)
- return 1
+ """Convert an os.wait status code to a shell return value"""
+ if os.WIFEXITED(status):
+ return os.WEXITSTATUS(status)
+ if os.WIFSTOPPED(status):
+ return 128+os.WSTOPSIG(status)
+ if os.WIFSIGNALED(status):
+ return 128+os.WTERMSIG(status)
+ return 1
def GetUserInput(prompt):
- """Prompt a user for input and return the result.
+ """Prompt a user for input and return the result.
- Also pauses all known running timers, so that their
- time doesn't include user thinking time.
- """
- print prompt
- sys.stdout.flush()
- timer.Timer.MassPauseRunningTimers()
- result = sys.stdin.readline()
- timer.Timer.ResumeMassPausedTimers()
- return result
+ Also pauses all known running timers, so that their
+ time doesn't include user thinking time.
+ """
+ print prompt
+ sys.stdout.flush()
+ timer.Timer.MassPauseRunningTimers()
+ result = sys.stdin.readline()
+ timer.Timer.ResumeMassPausedTimers()
+ return result
def GetHostArch():
- """Returns the Android name of the host architecture.
-
- Raises:
- HostError: host architecture is not supported.
- """
- if os.uname()[0].lower() != 'linux':
- raise HostUnsupportedArchError('The BDK only supports Linux machines.')
- return 'linux-x86'
+ """Returns the Android name of the host architecture.
+
+ Raises:
+ HostError: host architecture is not supported.
+ """
+ if os.uname()[0].lower() != 'linux':
+ raise HostUnsupportedArchError('The BDK only supports Linux machines.')
+ return 'linux-x86'
diff --git a/cli/lib/core/util_stub.py b/cli/lib/core/util_stub.py
index ff15be2..45bd773 100644
--- a/cli/lib/core/util_stub.py
+++ b/cli/lib/core/util_stub.py
@@ -24,72 +24,73 @@ import error
class StubUtil(object):
- """Stubs for our core.util module."""
-
- class Error(error.Error):
- pass
-
- class HostUnsupportedArchError(Error):
- pass
-
- class OSVersionError(Error):
- pass
-
- PROJECT_SPEC_FILENAME = 'unittest_project_spec.xml'
-
- def __init__(self, bdk_path='/some/bdk/path',
- user_data_path='/user/data/path', os_path='/os/path',
- bdk_version='99.99', os_version='88.88', arch='fake_arch'):
- self.bdk_path = bdk_path
- self.user_data_path = user_data_path
- self.os_path = os_path
- self.bdk_version = bdk_version
- self.os_version = os_version
- self.arch = arch
- self.arch_is_supported = True
- self.project_spec = None
-
- def GetBDKPath(self, *relpath):
- return os.path.join(self.bdk_path, *relpath)
-
- def GetUserDataPath(self, *relpath):
- return os.path.join(self.user_data_path, *relpath)
-
- def DEPRECATED_GetDefaultOSPath(self, *relpath):
- return os.path.join(self.os_path, *relpath)
-
- def GetOSPath(self, version, *relpath):
- if version != self.os_version:
- raise self.OSVersionError('Invalid OS version: {} (expected {})'.format(
- version, self.os_version))
- return os.path.join(self.os_path, *relpath)
-
- def GetBDKVersion(self):
- return self.bdk_version
-
- def GetOSVersion(self):
- return self.os_version
-
- @staticmethod
- def GetAndroidProductOut(platform_out, board):
- return util.GetAndroidProductOut(platform_out, board)
-
- @classmethod
- def AsShellArgs(cls, ary):
- return util.AsShellArgs(ary)
-
- @classmethod
- def GetExitCode(cls, retval):
- if retval == 0:
- return 0
- else:
- return 1
-
- def GetHostArch(self):
- if not self.arch_is_supported:
- raise self.HostUnsupportedArchError(
- '{} is not supported.'.format(self.arch))
- return self.arch
-
- def FindProjectSpec(self):
- return self.project_spec
+ """Stubs for our core.util module."""
+
+ class Error(error.Error):
+ pass
+
+ class HostUnsupportedArchError(Error):
+ pass
+
+ class OSVersionError(Error):
+ pass
+
+ PROJECT_SPEC_FILENAME = 'unittest_project_spec.xml'
+
+ def __init__(self, bdk_path='/some/bdk/path',
+ user_data_path='/user/data/path', os_path='/os/path',
+ bdk_version='99.99', os_version='88.88', arch='fake_arch'):
+ self.bdk_path = bdk_path
+ self.user_data_path = user_data_path
+ self.os_path = os_path
+ self.bdk_version = bdk_version
+ self.os_version = os_version
+ self.arch = arch
+ self.arch_is_supported = True
+ self.project_spec = None
+
+ def GetBDKPath(self, *relpath):
+ return os.path.join(self.bdk_path, *relpath)
+
+ def GetUserDataPath(self, *relpath):
+ return os.path.join(self.user_data_path, *relpath)
+
+ def DEPRECATED_GetDefaultOSPath(self, *relpath):
+ return os.path.join(self.os_path, *relpath)
+
+ def GetOSPath(self, version, *relpath):
+ if version != self.os_version:
+ raise self.OSVersionError(
+ 'Invalid OS version: {} (expected {})'.format(version,
+ self.os_version))
+ return os.path.join(self.os_path, *relpath)
+
+ def GetBDKVersion(self):
+ return self.bdk_version
+
+ def GetOSVersion(self):
+ return self.os_version
+
+ @staticmethod
+ def GetAndroidProductOut(platform_out, board):
+ return util.GetAndroidProductOut(platform_out, board)
+
+ @classmethod
+ def AsShellArgs(cls, ary):
+ return util.AsShellArgs(ary)
+
+ @classmethod
+ def GetExitCode(cls, retval):
+ if retval == 0:
+ return 0
+ else:
+ return 1
+
+ def GetHostArch(self):
+ if not self.arch_is_supported:
+ raise self.HostUnsupportedArchError(
+ '{} is not supported.'.format(self.arch))
+ return self.arch
+
+ def FindProjectSpec(self):
+ return self.project_spec
diff --git a/cli/lib/core/util_unittest.py b/cli/lib/core/util_unittest.py
index 399e194..71ef2a4 100644
--- a/cli/lib/core/util_unittest.py
+++ b/cli/lib/core/util_unittest.py
@@ -25,97 +25,102 @@ from test import stubs
class UtilTest(unittest.TestCase):
- """Tests for the Util functions."""
-
- def setUp(self):
- self.stub_os = stubs.StubOs()
- self.stub_open = stubs.StubOpen(self.stub_os)
-
- util.os = self.stub_os
- util.open = self.stub_open.open
-
- def test_bdk_path(self):
- # We use startswith since it might also be .pyc if unit tests have been
- # previously run.
- self.assertTrue(__file__.startswith(
- util.GetBDKPath('cli', 'lib', 'core', 'util_unittest.py')))
-
- def test_bdk_version(self):
- self.stub_open.files[util.GetBDKPath('VERSION')] = stubs.StubFile('123.45')
- self.stub_os.path.should_exist = [util.GetBDKPath('VERSION')]
- self.assertEqual(util.GetBDKVersion(), '123.45')
-
- def test_os_version(self):
- self.stub_open.files[util.GetBDKPath('VERSION')] = stubs.StubFile('67.89')
- self.stub_os.path.should_exist = [util.GetBDKPath('VERSION')]
- self.assertEqual(util.GetOSVersion(), '67.89')
-
- def test_os_path(self):
- self.stub_open.files[util.GetBDKPath('VERSION')] = stubs.StubFile('01.23')
- self.stub_os.path.should_exist = [util.GetBDKPath('VERSION')]
- # We use startswith since it might also be .pyc if unit tests have been
- # previously run.
- self.assertTrue(__file__.startswith(
- util.GetOSPath('01.23', 'tools', 'bdk', 'cli', 'lib', 'core',
- 'util_unittest.py')))
- # Error should complain about requested version.
- with self.assertRaisesRegexp(util.OSVersionError, '4.5'):
- util.GetOSPath('4.5')
-
- def test_deprecated_default_os_path(self):
- self.stub_open.files[util.GetBDKPath('VERSION')] = stubs.StubFile('01.23')
- self.stub_os.path.should_exist = [util.GetBDKPath('VERSION')]
- # We use startswith since it might also be .pyc if unit tests have been
- # previously run.
- self.assertTrue(__file__.startswith(
- util.DEPRECATED_GetDefaultOSPath('tools', 'bdk', 'cli', 'lib', 'core',
- 'util_unittest.py')))
-
- def test_get_product_dir(self):
- self.stub_os.cwd = '/long/way/down/to/the/bottom/turtle/'
- self.stub_os.path.should_exist = [
- self.stub_os.path.join('/long', util.ANDROID_PRODUCTS_MK),
- self.stub_os.path.join('/long/way/down/to/top',
- util.ANDROID_PRODUCTS_MK),
- self.stub_os.path.join('/long/way', util.ANDROID_PRODUCTS_MK)]
- # Make sure we find the closest in our direct path.
- self.assertEqual(util.GetProductDir(), '/long/way')
-
- def test_get_no_product_dir(self):
- self.stub_os.cwd = '/long/way/down/to/the/bottom/turtle/'
- self.stub_os.path.should_exist = [
- self.stub_os.path.join('/other', util.ANDROID_PRODUCTS_MK)]
- self.assertEqual(util.GetProductDir(), None)
-
- def test_get_project_spec(self):
- self.stub_os.cwd = '/long/way/down/to/the/bottom/turtle/'
- self.stub_os.path.should_exist = [
- self.stub_os.path.join('/long', util.PROJECT_SPEC_FILENAME),
- self.stub_os.path.join('/long/way/down/to/top',
- util.PROJECT_SPEC_FILENAME),
- self.stub_os.path.join('/long/way', util.PROJECT_SPEC_FILENAME)]
- # Make sure we find the closest in our direct path.
- self.assertEqual(util.FindProjectSpec(),
- self.stub_os.path.join('/long/way',
- util.PROJECT_SPEC_FILENAME))
-
- def test_get_no_project_spec(self):
- self.stub_os.cwd = '/long/way/down/to/the/bottom/turtle/'
- self.stub_os.path.should_exist = [
- self.stub_os.path.join('/other', util.PROJECT_SPEC_FILENAME)]
- self.assertEqual(util.FindProjectSpec(), None)
-
- def test_android_product_out(self):
- self.assertEqual(util.GetAndroidProductOut('platform/dir/', 'board_name'),
- 'platform/dir/target/product/board_name')
-
- def test_get_host(self):
- self.stub_os.SetUname(('Linux', 'host_name.website.com', '99.99-awesome',
- 'stuff', 'x86_64'))
- self.assertEqual(util.GetHostArch(), 'linux-x86')
-
- def test_get_bad_host(self):
- self.stub_os.SetUname(('OSX', 'host_name.website.com', '99.99-awesome',
- 'stuff', 'x86_64'))
- with self.assertRaises(util.HostUnsupportedArchError):
- util.GetHostArch()
+ """Tests for the Util functions."""
+
+ def setUp(self):
+ self.stub_os = stubs.StubOs()
+ self.stub_open = stubs.StubOpen(self.stub_os)
+
+ util.os = self.stub_os
+ util.open = self.stub_open.open
+
+ def test_bdk_path(self):
+ # We use startswith since it might also be .pyc if unit tests have been
+ # previously run.
+ self.assertTrue(__file__.startswith(
+ util.GetBDKPath('cli', 'lib', 'core', 'util_unittest.py')))
+
+ def test_bdk_version(self):
+ self.stub_open.files[util.GetBDKPath('VERSION')] = (
+ stubs.StubFile('123.45'))
+ self.stub_os.path.should_exist = [util.GetBDKPath('VERSION')]
+ self.assertEqual(util.GetBDKVersion(), '123.45')
+
+ def test_os_version(self):
+ self.stub_open.files[util.GetBDKPath('VERSION')] = (
+ stubs.StubFile('67.89'))
+ self.stub_os.path.should_exist = [util.GetBDKPath('VERSION')]
+ self.assertEqual(util.GetOSVersion(), '67.89')
+
+ def test_os_path(self):
+ self.stub_open.files[util.GetBDKPath('VERSION')] = (
+ stubs.StubFile('01.23'))
+ self.stub_os.path.should_exist = [util.GetBDKPath('VERSION')]
+ # We use startswith since it might also be .pyc if unit tests have been
+ # previously run.
+ self.assertTrue(__file__.startswith(
+ util.GetOSPath('01.23', 'tools', 'bdk', 'cli', 'lib', 'core',
+ 'util_unittest.py')))
+ # Error should complain about requested version.
+ with self.assertRaisesRegexp(util.OSVersionError, '4.5'):
+ util.GetOSPath('4.5')
+
+ def test_deprecated_default_os_path(self):
+ self.stub_open.files[util.GetBDKPath('VERSION')] = (
+ stubs.StubFile('01.23'))
+ self.stub_os.path.should_exist = [util.GetBDKPath('VERSION')]
+ # We use startswith since it might also be .pyc if unit tests have been
+ # previously run.
+ self.assertTrue(__file__.startswith(
+ util.DEPRECATED_GetDefaultOSPath('tools', 'bdk', 'cli', 'lib',
+ 'core', 'util_unittest.py')))
+
+ def test_get_product_dir(self):
+ self.stub_os.cwd = '/long/way/down/to/the/bottom/turtle/'
+ self.stub_os.path.should_exist = [
+ self.stub_os.path.join('/long', util.ANDROID_PRODUCTS_MK),
+ self.stub_os.path.join('/long/way/down/to/top',
+ util.ANDROID_PRODUCTS_MK),
+ self.stub_os.path.join('/long/way', util.ANDROID_PRODUCTS_MK)]
+ # Make sure we find the closest in our direct path.
+ self.assertEqual(util.GetProductDir(), '/long/way')
+
+ def test_get_no_product_dir(self):
+ self.stub_os.cwd = '/long/way/down/to/the/bottom/turtle/'
+ self.stub_os.path.should_exist = [
+ self.stub_os.path.join('/other', util.ANDROID_PRODUCTS_MK)]
+ self.assertEqual(util.GetProductDir(), None)
+
+ def test_get_project_spec(self):
+ self.stub_os.cwd = '/long/way/down/to/the/bottom/turtle/'
+ self.stub_os.path.should_exist = [
+ self.stub_os.path.join('/long', util.PROJECT_SPEC_FILENAME),
+ self.stub_os.path.join('/long/way/down/to/top',
+ util.PROJECT_SPEC_FILENAME),
+ self.stub_os.path.join('/long/way', util.PROJECT_SPEC_FILENAME)]
+ # Make sure we find the closest in our direct path.
+ self.assertEqual(util.FindProjectSpec(),
+ self.stub_os.path.join('/long/way',
+ util.PROJECT_SPEC_FILENAME))
+
+ def test_get_no_project_spec(self):
+ self.stub_os.cwd = '/long/way/down/to/the/bottom/turtle/'
+ self.stub_os.path.should_exist = [
+ self.stub_os.path.join('/other', util.PROJECT_SPEC_FILENAME)]
+ self.assertEqual(util.FindProjectSpec(), None)
+
+ def test_android_product_out(self):
+ self.assertEqual(
+ util.GetAndroidProductOut('platform/dir/', 'board_name'),
+ 'platform/dir/target/product/board_name')
+
+ def test_get_host(self):
+ self.stub_os.SetUname(('Linux', 'host_name.website.com',
+ '99.99-awesome', 'stuff', 'x86_64'))
+ self.assertEqual(util.GetHostArch(), 'linux-x86')
+
+ def test_get_bad_host(self):
+ self.stub_os.SetUname(('OSX', 'host_name.website.com', '99.99-awesome',
+ 'stuff', 'x86_64'))
+ with self.assertRaises(util.HostUnsupportedArchError):
+ util.GetHostArch()