diff options
author | Steve Fung <stevefung@google.com> | 2016-04-18 18:16:02 -0700 |
---|---|---|
committer | Steve Fung <stevefung@google.com> | 2016-04-19 07:58:29 +0000 |
commit | f574c6a64c36b12469873e82ab0ae4872059aebb (patch) | |
tree | 8cec5e55a84c3f89168e871c60c771cb2e8f1cef | |
parent | 8dbd8e8e133e3b1493c3a6ff0e6614f8669345ae (diff) | |
download | bdk-f574c6a64c36b12469873e82ab0ae4872059aebb.tar.gz |
Convert bsp/ to 4 space indent
Convert all files in the bsp/ folder to a 4 space indent
to comply with PEP8 style rules.
Bug: 28007659
Test: `python test_runner.py` passes.
Test: pylint passes.
Change-Id: I9ed93ee77acccaa6b4b30a2a2fe4b9ab02546a35
-rw-r--r-- | cli/lib/bsp/device.py | 775 | ||||
-rw-r--r-- | cli/lib/bsp/device_stub.py | 36 | ||||
-rw-r--r-- | cli/lib/bsp/device_unittest.py | 862 | ||||
-rw-r--r-- | cli/lib/bsp/manifest.py | 192 | ||||
-rw-r--r-- | cli/lib/bsp/manifest_stub.py | 56 | ||||
-rw-r--r-- | cli/lib/bsp/manifest_unittest.py | 238 | ||||
-rw-r--r-- | cli/lib/bsp/package.py | 772 | ||||
-rw-r--r-- | cli/lib/bsp/package_stub.py | 190 | ||||
-rw-r--r-- | cli/lib/bsp/package_unittest.py | 1107 | ||||
-rw-r--r-- | cli/lib/bsp/status.py | 26 | ||||
-rw-r--r-- | cli/lib/bsp/subpackage.py | 52 | ||||
-rw-r--r-- | cli/lib/bsp/subpackage_unittest.py | 58 |
12 files changed, 2196 insertions, 2168 deletions
diff --git a/cli/lib/bsp/device.py b/cli/lib/bsp/device.py index 19c5732..d6ad014 100644 --- a/cli/lib/bsp/device.py +++ b/cli/lib/bsp/device.py @@ -34,415 +34,420 @@ DEVICE_KEY_ARCH = 'arch' class Error(error.Error): - description = 'Error with Device' + description = 'Error with Device' class LicenseError(Error): - """Raised when there is an error with licensing.""" - description = 'Error approving licenses' + """Raised when there is an error with licensing.""" + description = 'Error approving licenses' class PackageDownloadError(Error): - """Raised when a package for the device has trouble downloading.""" - description = 'Error downloading package' + """Raised when a package for the device has trouble downloading.""" + description = 'Error downloading package' class PackageLinkError(Error): - """Raised when a subpackage for the device fails to link properly.""" - description = 'Failed to link BSP package to OS tree' + """Raised when a subpackage for the device fails to link properly.""" + description = 'Failed to link BSP package to OS tree' class PackageUnlinkError(Error): - """Raised when a subpackage for the device fails to unlink properly.""" - description = 'Failed to unlink BSP from OS tree' + """Raised when a subpackage for the device fails to unlink properly.""" + description = 'Failed to unlink BSP from OS tree' class _LinkContext(object): - """A context manager to allow temporary linking of a device to an OS. - - Attributes: - device: The device to temporarily link. - os_version: The os version to temporarily link to - """ - - def __init__(self, device, os_version): - self.device = device - self.os_version = os_version - - def __enter__(self): - success = False - try: - self.device.link(self.os_version) - success = True - finally: - # If something goes wrong, leave unlinked. - if not success: - self.cleanup() - - def __exit__(self, type_, value_, traceback_): - # TODO(b/28028440): Even if you were linked initially, when - # exiting the context the links will be removed. - self.cleanup() - - def cleanup(self): - self.device.unlink(self.os_version) - - -class Device(object): - """Class to represent devices with BSPs available. - - Attributes: - name: the name of the package. - vendor: the vendor of the device. - arch: the architecture of the device. - version: the version of the device. - _package_map: a mapping of {package.Package: {subpackage_name: path}}, - informing which subpackages of which packages this device requires, - and where in the OS it expects them to be installed - (path is relative to OS root). - """ - - def __init__(self, name, vendor, arch, package_map): - """Initializes a Device. - - Args: - name: the name of the device. - vendor: the vendor of the device. - arch: the architecture of the device. - package_map: a mapping of - {package.Package: {subpackage_name: relative_path}}, - where relative_path is relative to the BDK root. - """ - self.name = name - self.vendor = vendor - self.arch = arch - self.version = '0.0.0' - self._package_map = package_map - - def status(self, os_version='', verbose=False): - """Checks the status of a device. - - Args: - os_version: (optional) The os version to check the status of this - device against. If empty, ignore LINKED/UNRECOGNIZED; - only worry about INSTALLED/MISSING/NOT_INSTALLED. - verbose: (optional) If True, also return details of subpackages. - - Returns: - (status, status string), where where status is the highest status - of any subpackage that is part of the device BSP, or status.INSTALLED - if the BSP has no subpackages. The status string lists the device and - human-readable status, and in verbose mode also includes the same for - each subpackage. - """ - device_status = status.INSTALLED - packages_string = '' - for (pkg, subpackage_map) in self._package_map.iteritems(): - packages_string += '\n * {0}'.format(pkg.name) - for (subpackage, relpath) in subpackage_map.iteritems(): - subpackage_path = '' - if os_version: - subpackage_path = util.GetOSPath(os_version, relpath) - (subpackage_status, subpackage_string) = pkg.subpackage_status( - subpackage, subpackage_path) - packages_string += '\n * {0}'.format(subpackage_string) - device_status = max(device_status, subpackage_status) - - device_string = '{} {} - {}'.format(self.name, self.version, - status.status_string(device_status)) - if verbose: - device_string += packages_string - - return (device_status, device_string) - - def is_available(self): - """Checks whether a device is available for use. - - Returns: - True if the BSP status is status.INSTALLED, - False otherwise. - """ - return self.status()[0] == status.INSTALLED + """A context manager to allow temporary linking of a device to an OS. - def link(self, os_version): - """Links an OS to this device. - - Does not overwrite anything; ignores situations where it would need to. - - Args: - os_version: The OS to link to this device. - - Raises: - PackageLinkError: if a package problem occurs while linking. - util.OSVersionError: if the OS version is invalid. + Attributes: + device: The device to temporarily link. + os_version: The os version to temporarily link to. """ - errors = [] - for (pkg, subpackage_map) in self._package_map.iteritems(): - for (subpackage_name, relpath) in subpackage_map.iteritems(): - link = util.GetOSPath(os_version, relpath) - # Don't overwrite anything. - if os.path.exists(link): - continue - - try: - pkg.link_subpackage(subpackage_name, link) - except package.Error as e: - errors.append('package {}: {}'.format(pkg.name, e)) - # Error with the whole package, not just a single subpackage, - # don't bother with the rest of the package. - if isinstance(e, package.NotDownloadedError): - break - if errors: - raise PackageLinkError('OS version {}: {}'.format(os_version, errors)) + def __init__(self, device, os_version): + self.device = device + self.os_version = os_version - def unlink(self, os_version): - """Unlinks this device from an OS. - - Removes links of LINKED subpackages. - - Args: - os_version: The OS to unlink from. - - Raises: - PackageUnlinkError: if a package problem occurs while unlinking. - util.OSVersionError: if the OS version is invalid. - """ - errors = [] - for (pkg, subpackage_map) in self._package_map.iteritems(): - for (subpackage, relpath) in subpackage_map.iteritems(): - link = util.GetOSPath(os_version, relpath) + def __enter__(self): + success = False try: - subpackage_status, _ = pkg.subpackage_status( - subpackage, link) - if subpackage_status == status.LINKED: - pkg.unlink_subpackage(subpackage, link) - except package.Error as e: - errors.append('{}: {}'.format(pkg, e)) - if errors: - raise PackageUnlinkError('OS version {}: {}'.format( - os_version, errors)) - - def linked(self, os_version): - """A context manager for temporary linkage. - - Note: b/28028440: It is a known bug that after exiting the context, - the device will be unlinked, whether or not it was linked going in. - - Args: - os_version: The OS version to temporarily link against. - - Returns: - A context manager within which the BSP and OS will be linked. - """ - return _LinkContext(self, os_version) + self.device.link(self.os_version) + success = True + finally: + # If something goes wrong, leave unlinked. + if not success: + self.cleanup() + + def __exit__(self, type_, value_, traceback_): + # TODO(b/28028440): Even if you were linked initially, when + # exiting the context the links will be removed. + self.cleanup() - def unrecognized_paths(self, os_version): - """Get the paths of unrecognized subpackages of this device. + def cleanup(self): + self.device.unlink(self.os_version) - Args: - os_version: The OS version to check for unrecognized paths in. - Returns: - A list of paths of unrecognized subpackages of this device. - """ - result = [] - for (pkg, subpackage_map) in self._package_map.iteritems(): - for (subpackage, relpath) in subpackage_map.iteritems(): - subpackage_path = util.GetOSPath(os_version, relpath) - status_, _ = pkg.subpackage_status(subpackage, subpackage_path) - if status_ == status.UNRECOGNIZED: - result.append(subpackage_path) - return result - - def match_tarball(self, tarball): - """Matches a tarball to its corresponding package. - - Args: - tarball: a path to a tarball file. - - Returns: - The name of the matching package, if any. - None otherwise. - - Raises: - package.VersionError, if there is a problem reading the version. +class Device(object): + """Class to represent devices with BSPs available. + + Attributes: + name: the name of the package. + vendor: the vendor of the device. + arch: the architecture of the device. + version: the version of the device. + _package_map: a mapping of {package.Package: {subpackage_name: path}}, + informing which subpackages of which packages this device requires, + and where in the OS it expects them to be installed + (path is relative to OS root). """ - tar_hash = package.TarPackage.get_tarball_version(tarball) - for pkg in self._package_map: - if tar_hash.startswith(pkg.version): - return pkg.name - - return None - def _prompt_licenses(self, auto_accept=False): - """Helper - finds and prompts all license files interactively. - - Args: - auto_accept: (optional) If true, still prints a list of licenses, - but automatically accepts them. - - Returns: - True if licenses were accepted, False otherwise. - - Raises: - LicenseError: if there is a problem reading a license. - package.NoSuchSubpackageError: if the package map is not correctly - validated and specifies a non-existant subpackage. - """ - # Find all the licenses. - licenses = [] - for (pkg, subpackage_map) in self._package_map.iteritems(): - licenses += pkg.get_licenses(subpackage_map.keys()) - - # If there aren't any, great! - num_licenses = len(licenses) - if num_licenses == 0: - return True - - # Write a prompt. - prompt = ( - 'This package contains software provided by a third party ' - 'with different license terms. Please read and make sure you ' - 'understand each of the following licenses before choosing accept.\n' - ) - for i in range(num_licenses): - prompt += '{}. {}\n'.format(i+1, licenses[i].name) - if num_licenses == 1: - prompt += '(1) View License\n' - else: - prompt += '(1-{}) View License\n'.format(num_licenses) - prompt += ('(A) Accept All\n' - '(C) Cancel') - - # Ask the user to accept. - confirmed = auto_accept or None - while confirmed is None: - choice = util.GetUserInput(prompt).strip().upper() - if choice.isdigit(): - choice = int(choice) - if choice > 0 and choice <= num_licenses: - # View license. - try: - with open(licenses[choice - 1].path) as f: - print f.read() - except IOError as e: - raise LicenseError('failed to read license {} ({}): {}'.format( - choice, licenses[choice - 1].name, e)) + def __init__(self, name, vendor, arch, package_map): + """Initializes a Device. + + Args: + name: the name of the device. + vendor: the vendor of the device. + arch: the architecture of the device. + package_map: a mapping of + {package.Package: {subpackage_name: relative_path}}, + where relative_path is relative to the BDK root. + """ + self.name = name + self.vendor = vendor + self.arch = arch + self.version = '0.0.0' + self._package_map = package_map + + def status(self, os_version='', verbose=False): + """Checks the status of a device. + + Args: + os_version: (optional) The os version to check the status of this + device against. If empty, ignore LINKED/UNRECOGNIZED; + only worry about INSTALLED/MISSING/NOT_INSTALLED. + verbose: (optional) If True, also return details of subpackages. + + Returns: + (status, status string), where where status is the highest status + of any subpackage that is part of the device BSP, or + status.INSTALLED if the BSP has no subpackages. The status string + lists the device and human-readable status, and in verbose mode + also includes the same for each subpackage. + """ + device_status = status.INSTALLED + packages_string = '' + for (pkg, subpackage_map) in self._package_map.iteritems(): + packages_string += '\n * {0}'.format(pkg.name) + for (subpackage, relpath) in subpackage_map.iteritems(): + subpackage_path = '' + if os_version: + subpackage_path = util.GetOSPath(os_version, relpath) + (subpackage_status, subpackage_string) = pkg.subpackage_status( + subpackage, subpackage_path) + packages_string += '\n * {0}'.format(subpackage_string) + device_status = max(device_status, subpackage_status) + + device_string = '{} {} - {}'.format(self.name, self.version, + status.status_string(device_status)) + if verbose: + device_string += packages_string + + return (device_status, device_string) + + def is_available(self): + """Checks whether a device is available for use. + + Returns: + True if the BSP status is status.INSTALLED, + False otherwise. + """ + return self.status()[0] == status.INSTALLED + + def link(self, os_version): + """Links an OS to this device. + + Does not overwrite anything; ignores situations where it would need to. + + Args: + os_version: The OS to link to this device. + + Raises: + PackageLinkError: if a package problem occurs while linking. + util.OSVersionError: if the OS version is invalid. + """ + errors = [] + for (pkg, subpackage_map) in self._package_map.iteritems(): + for (subpackage_name, relpath) in subpackage_map.iteritems(): + link = util.GetOSPath(os_version, relpath) + # Don't overwrite anything. + if os.path.exists(link): + continue + + try: + pkg.link_subpackage(subpackage_name, link) + except package.Error as e: + errors.append('package {}: {}'.format(pkg.name, e)) + # Error with the whole package, not just a single + # subpackage, don't bother with the rest of the package. + if isinstance(e, package.NotDownloadedError): + break + + if errors: + raise PackageLinkError('OS version {}: {}'.format(os_version, + errors)) + + def unlink(self, os_version): + """Unlinks this device from an OS. + + Removes links of LINKED subpackages. + + Args: + os_version: The OS to unlink from. + + Raises: + PackageUnlinkError: if a package problem occurs while unlinking. + util.OSVersionError: if the OS version is invalid. + """ + errors = [] + for (pkg, subpackage_map) in self._package_map.iteritems(): + for (subpackage, relpath) in subpackage_map.iteritems(): + link = util.GetOSPath(os_version, relpath) + try: + subpackage_status, _ = pkg.subpackage_status( + subpackage, link) + if subpackage_status == status.LINKED: + pkg.unlink_subpackage(subpackage, link) + except package.Error as e: + errors.append('{}: {}'.format(pkg, e)) + if errors: + raise PackageUnlinkError('OS version {}: {}'.format(os_version, + errors)) + + def linked(self, os_version): + """A context manager for temporary linkage. + + Note: b/28028440: It is a known bug that after exiting the context, + the device will be unlinked, whether or not it was linked going in. + + Args: + os_version: The OS version to temporarily link against. + + Returns: + A context manager within which the BSP and OS will be linked. + """ + return _LinkContext(self, os_version) + + def unrecognized_paths(self, os_version): + """Get the paths of unrecognized subpackages of this device. + + Args: + os_version: The OS version to check for unrecognized paths in. + + Returns: + A list of paths of unrecognized subpackages of this device. + """ + result = [] + for (pkg, subpackage_map) in self._package_map.iteritems(): + for (subpackage, relpath) in subpackage_map.iteritems(): + subpackage_path = util.GetOSPath(os_version, relpath) + status_, _ = pkg.subpackage_status(subpackage, subpackage_path) + if status_ == status.UNRECOGNIZED: + result.append(subpackage_path) + return result + + def match_tarball(self, tarball): + """Matches a tarball to its corresponding package. + + Args: + tarball: a path to a tarball file. + + Returns: + The name of the matching package, if any. + None otherwise. + + Raises: + package.VersionError, if there is a problem reading the version. + """ + tar_hash = package.TarPackage.get_tarball_version(tarball) + for pkg in self._package_map: + if tar_hash.startswith(pkg.version): + return pkg.name + + return None + + def _prompt_licenses(self, auto_accept=False): + """Helper - finds and prompts all license files interactively. + + Args: + auto_accept: (optional) If true, still prints a list of licenses, + but automatically accepts them. + + Returns: + True if licenses were accepted, False otherwise. + + Raises: + LicenseError: if there is a problem reading a license. + package.NoSuchSubpackageError: if the package map is not correctly + validated and specifies a non-existant subpackage. + """ + # Find all the licenses. + licenses = [] + for (pkg, subpackage_map) in self._package_map.iteritems(): + licenses += pkg.get_licenses(subpackage_map.keys()) + + # If there aren't any, great! + num_licenses = len(licenses) + if num_licenses == 0: + return True + + # Write a prompt. + prompt = ( + 'This package contains software provided by a third party ' + 'with different license terms. Please read and make sure you ' + 'understand each of the following licenses before choosing ' + 'accept.\n' + ) + for i in range(num_licenses): + prompt += '{}. {}\n'.format(i+1, licenses[i].name) + if num_licenses == 1: + prompt += '(1) View License\n' else: - # Invalid number. - if num_licenses == 1: - print 'The only license available to view is 1.' - else: - print 'Please specify a license number from 1-{}.'.format( - num_licenses) - elif choice == 'A': - confirmed = True - elif choice == 'C': - confirmed = False - else: - print ('Unrecognized option "{}". Please specify "A", "C", ' - 'or a license number to view.'.format(choice)) - - return confirmed - - def install(self, extract_only=None, auto_accept=False, link_os_version='', - verbose=False): - """Installs the BSP for this device. - - Args: - extract_only: (optional) a map of { package_name : tarball_file }, - for packages to skip the tarball download step. - auto_accept: (optional) True to accept all licenses automatically. - link_os_version: (optional) An OS version to link packages into. - Intended only for when developing the OS/BSP, not for use with - product development flow. - verbose: (optional) If True, print status messages. Default False. - - Returns: - True if the installation is successful, False otherwise. - - Raises: - LicenseError: if the user does not accept all licenses. - PackageLinkError: if a package fails to link to the tree. - (Will not occur if link_os_version is None or empty) - PackageDownloadError: if a required bsp.Package fails to download. - package.NoSuchSubpackageError: if the package map is not correctly - validated and specifies a non-existant subpackage. - """ - extract_only = extract_only or {} - downloaded = [] - licenses_confirmed = False - - try: - # Download all missing packages, tracking which these are. - for pkg in self._package_map: - if not pkg.is_downloaded(): - if verbose: - print 'Downloading {}...'.format(pkg.name) - try: - pkg.download(tarball=extract_only.get(pkg.name)) - downloaded.append(pkg) - except package.Error as e: - raise PackageDownloadError('{}: {}'.format(pkg.name, e)) - elif verbose: - print '{} already downloaded...'.format(pkg.name) - - # Find all licenses and give the interactive prompt. - licenses_confirmed = self._prompt_licenses(auto_accept=auto_accept) - # If licenses weren't confirmed, we're done. - if not licenses_confirmed: - raise LicenseError('user did not accept all licenses.') - finally: - # If anything failed, remove what we downloaded. - if not licenses_confirmed: - for pkg in downloaded: - pkg.uninstall() - - # If requested, go ahead and install the links. - if link_os_version: - self.link(link_os_version) - - def uninstall(self): - """Uninstalls all BSP packages for a device. - - Raises: - OSError: if there is an unexpected problem uninstalling. - """ - for pkg in self._package_map: - pkg.uninstall() - - @classmethod - def from_dict(cls, dic, packages): - """Create a Device from a dict. - - Merges together the name-based package identification with - assembled Package objects. + prompt += '(1-{}) View License\n'.format(num_licenses) + prompt += ('(A) Accept All\n' + '(C) Cancel') + + # Ask the user to accept. + confirmed = auto_accept or None + while confirmed is None: + choice = util.GetUserInput(prompt).strip().upper() + if choice.isdigit(): + choice = int(choice) + if choice > 0 and choice <= num_licenses: + # View license. + try: + with open(licenses[choice - 1].path) as f: + print f.read() + except IOError as e: + raise LicenseError( + 'failed to read license {} ({}): {}'.format( + choice, licenses[choice - 1].name, e)) + else: + # Invalid number. + if num_licenses == 1: + print 'The only license available to view is 1.' + else: + print ('Please specify a license number from ' + '1-{}.'.format(num_licenses)) + elif choice == 'A': + confirmed = True + elif choice == 'C': + confirmed = False + else: + print ('Unrecognized option "{}". Please specify "A", "C", ' + 'or a license number to view.'.format(choice)) + + return confirmed + + def install(self, extract_only=None, auto_accept=False, link_os_version='', + verbose=False): + """Installs the BSP for this device. + + Args: + extract_only: (optional) a map of { package_name : tarball_file }, + for packages to skip the tarball download step. + auto_accept: (optional) True to accept all licenses automatically. + link_os_version: (optional) An OS version to link packages into. + Intended only for when developing the OS/BSP, not for use with + product development flow. + verbose: (optional) If True, print status messages. Default False. + + Returns: + True if the installation is successful, False otherwise. + + Raises: + LicenseError: if the user does not accept all licenses. + PackageLinkError: if a package fails to link to the tree. + (Will not occur if link_os_version is None or empty) + PackageDownloadError: if a required bsp.Package fails to download. + package.NoSuchSubpackageError: if the package map is not correctly + validated and specifies a non-existant subpackage. + """ + extract_only = extract_only or {} + downloaded = [] + licenses_confirmed = False - Args: - dic: the dictionary to build the device from. - packages: a dictionary mapping { package_name : Package } - - Returns: - A Device generated from |dic| and |packages|. - - Raises: - KeyError: An expected key is missing. - ValueError: A non-existant package or subpackage name is used. - """ - name = dic[DEVICE_KEY_NAME] - package_map = {} - for (package_name, subpackage_map) in dic[DEVICE_KEY_PACKAGES].iteritems(): - pkg = packages.get(package_name) - if not pkg: - raise ValueError( - 'Package {0} for {1} does not exist.'.format(package_name, name)) - for subpackage in subpackage_map: - if subpackage not in pkg.subpackages: - raise ValueError( - 'Subpackage {0}.{1} for {2} does not exist.'.format( - package_name, subpackage, name)) - package_map[pkg] = subpackage_map - - return cls(name, dic[DEVICE_KEY_VENDOR], dic[DEVICE_KEY_ARCH], - package_map) + try: + # Download all missing packages, tracking which these are. + for pkg in self._package_map: + if not pkg.is_downloaded(): + if verbose: + print 'Downloading {}...'.format(pkg.name) + try: + pkg.download(tarball=extract_only.get(pkg.name)) + downloaded.append(pkg) + except package.Error as e: + raise PackageDownloadError('{}: {}'.format(pkg.name, e)) + elif verbose: + print '{} already downloaded...'.format(pkg.name) + + # Find all licenses and give the interactive prompt. + licenses_confirmed = self._prompt_licenses(auto_accept=auto_accept) + # If licenses weren't confirmed, we're done. + if not licenses_confirmed: + raise LicenseError('user did not accept all licenses.') + finally: + # If anything failed, remove what we downloaded. + if not licenses_confirmed: + for pkg in downloaded: + pkg.uninstall() + + # If requested, go ahead and install the links. + if link_os_version: + self.link(link_os_version) + + def uninstall(self): + """Uninstalls all BSP packages for a device. + + Raises: + OSError: if there is an unexpected problem uninstalling. + """ + for pkg in self._package_map: + pkg.uninstall() + + @classmethod + def from_dict(cls, dic, packages): + """Create a Device from a dict. + + Merges together the name-based package identification with + assembled Package objects. + + Args: + dic: the dictionary to build the device from. + packages: a dictionary mapping { package_name : Package } + + Returns: + A Device generated from |dic| and |packages|. + + Raises: + KeyError: An expected key is missing. + ValueError: A non-existant package or subpackage name is used. + """ + name = dic[DEVICE_KEY_NAME] + package_map = {} + for (package_name, subpackage_map) in ( + dic[DEVICE_KEY_PACKAGES].iteritems()): + pkg = packages.get(package_name) + if not pkg: + raise ValueError( + 'Package {0} for {1} does not exist.'.format(package_name, + name)) + for subpackage in subpackage_map: + if subpackage not in pkg.subpackages: + raise ValueError( + 'Subpackage {0}.{1} for {2} does not exist.'.format( + package_name, subpackage, name)) + package_map[pkg] = subpackage_map + + return cls(name, dic[DEVICE_KEY_VENDOR], dic[DEVICE_KEY_ARCH], + package_map) diff --git a/cli/lib/bsp/device_stub.py b/cli/lib/bsp/device_stub.py index 30701b8..257aac2 100644 --- a/cli/lib/bsp/device_stub.py +++ b/cli/lib/bsp/device_stub.py @@ -24,21 +24,21 @@ from test import stubs class StubDevice(object): - def __init__(self, name='', vendor='', arch='', package_map=None, - version='', downloaded=False, should_link_version=None): - self.name = name - self.vendor = vendor - self.arch = arch - self.version = version - self.package_map = package_map or {} - self.should_link_version = should_link_version - self.downloaded = downloaded - - def linked(self, os_version): - if not self.should_link_version == os_version: - raise device.Error('Not supposed to link to {} (only to {})'.format( - os_version, self.should_link_version)) - return stubs.StubContextManager() - - def is_available(self): - return self.downloaded + def __init__(self, name='', vendor='', arch='', package_map=None, + version='', downloaded=False, should_link_version=None): + self.name = name + self.vendor = vendor + self.arch = arch + self.version = version + self.package_map = package_map or {} + self.should_link_version = should_link_version + self.downloaded = downloaded + + def linked(self, os_version): + if not self.should_link_version == os_version: + raise device.Error('Not supposed to link to {} (only to {})'.format( + os_version, self.should_link_version)) + return stubs.StubContextManager() + + def is_available(self): + return self.downloaded diff --git a/cli/lib/bsp/device_unittest.py b/cli/lib/bsp/device_unittest.py index 096735f..247602f 100644 --- a/cli/lib/bsp/device_unittest.py +++ b/cli/lib/bsp/device_unittest.py @@ -29,433 +29,439 @@ from test import stubs class DeviceTest(unittest.TestCase): - _OS_VERSION = '98.76' - - def setUp(self): - self.dev_json = { - 'device_name': 'Test Device 1', - 'vendor': 'test_vendor', - 'arch': 'test_arch', - 'packages': { - 'package_1': { - 'subpackage_1': 'path/to/link' + _OS_VERSION = '98.76' + + def setUp(self): + self.dev_json = { + 'device_name': 'Test Device 1', + 'vendor': 'test_vendor', + 'arch': 'test_arch', + 'packages': { + 'package_1': { + 'subpackage_1': 'path/to/link' + } } } - } - self.package1 = package_stub.StubPackage('package_1', subpackages={ - 'subpackage_1': status.INSTALLED, - 'subpackage_2': status.INSTALLED - }, licenses=['license1', 'license2']) - self.package2 = package_stub.StubPackage('package_2', subpackages={ - 'subpackage_2': status.INSTALLED - }) - self.packages = {'package_1': self.package1, 'package_2': self.package2} - - self.stub_os = stubs.StubOs() - self.stub_open = stubs.StubOpen(self.stub_os) - self.stub_hashlib = stubs.StubHashlib() - self.stub_tar_package = package_stub.StubTarPackage('tarball_version') - self.stub_util = util_stub.StubUtil(os_version=self._OS_VERSION) - - device.os = self.stub_os - device.open = self.stub_open.open - device.hashlib = self.stub_hashlib - device.util = self.stub_util - device.package = package_stub.StubPackageModule(self.stub_tar_package) - - self.dev = device.Device('test_device', 'test_vendor', 'test_arch', - {self.package1: {'subpackage_1': 'path1', - 'subpackage_2': 'path2'}, - self.package2: {'subpackage_2': 'path3'}}) - - def test_from_dict(self): - dev = device.Device.from_dict(self.dev_json, self.packages) - self.assertIsInstance(dev, device.Device) - # pylint: disable=protected-access - self.assertTrue(self.package1 in dev._package_map) - self.assertEqual(dev._package_map[self.package1]['subpackage_1'], - 'path/to/link') - - def test_from_dict_bad_package(self): - self.dev_json['packages']['nonexistent'] = {} - with self.assertRaisesRegexp(ValueError, - 'Package.*nonexistent.*does not exist'): - device.Device.from_dict(self.dev_json, self.packages) - - def test_from_dict_bad_subpackage(self): - self.dev_json['packages']['package_1']['nonexistent'] = {} - with self.assertRaisesRegexp(ValueError, - 'Sub.*nonexistent.*does not exist'): - device.Device.from_dict(self.dev_json, self.packages) - - def test_from_dict_missing_keys(self): - del self.dev_json['arch'] - with self.assertRaisesRegexp(KeyError, 'arch'): - device.Device.from_dict(self.dev_json, self.packages) - - self.dev_json['arch'] = 'test_arch' - del self.dev_json['vendor'] - with self.assertRaisesRegexp(KeyError, 'vendor'): - device.Device.from_dict(self.dev_json, self.packages) - - self.dev_json['vendor'] = 'test_vendor' - del self.dev_json['packages'] - with self.assertRaisesRegexp(KeyError, 'packages'): - device.Device.from_dict(self.dev_json, self.packages) - - def test_status_installed(self): - self.assertEqual(self.dev.status(self._OS_VERSION)[0], status.INSTALLED) - self.assertEqual(self.dev.status()[0], status.INSTALLED) - - def test_status_linked(self): - self.package1.subpackages['subpackage_1'] = status.LINKED - self.assertEqual(self.dev.status(self._OS_VERSION)[0], status.LINKED) - self.assertEqual(self.dev.status()[0], status.INSTALLED) - - def test_status_unrecognized(self): - self.package1.subpackages['subpackage_1'] = status.LINKED - self.package1.subpackages['subpackage_2'] = status.UNRECOGNIZED - # Also test that this beats out linked. - self.assertEqual(self.dev.status(self._OS_VERSION)[0], status.UNRECOGNIZED) - # In certain situations, this could also be missing, but that's more - # complex to code into stubs. - self.assertEqual(self.dev.status()[0], status.INSTALLED) - - def test_status_missing(self): - self.package1.subpackages['subpackage_1'] = status.LINKED - self.package1.subpackages['subpackage_2'] = status.UNRECOGNIZED - self.package2.subpackages['subpackage_2'] = status.MISSING - # Also test that this beats out linked and unrecognized. - self.assertEqual(self.dev.status()[0], status.MISSING) - self.assertEqual(self.dev.status(self._OS_VERSION)[0], status.MISSING) - - - def test_status_not_installed(self): - self.package1.subpackages['subpackage_1'] = status.NOT_INSTALLED - self.package1.subpackages['subpackage_2'] = status.UNRECOGNIZED - self.package2.subpackages['subpackage_2'] = status.MISSING - # Also test that this beats out missing and unrecognized - # (and transitively linked). - self.assertEqual(self.dev.status()[0], status.NOT_INSTALLED) - self.assertEqual(self.dev.status(self._OS_VERSION)[0], status.NOT_INSTALLED) - - def test_is_available(self): - self.assertTrue(self.dev.is_available()) - self.package2.subpackages['subpackage_2'] = status.UNRECOGNIZED - self.assertTrue(self.dev.is_available()) - self.package2.subpackages['subpackage_2'] = status.LINKED - self.assertTrue(self.dev.is_available()) - self.package2.subpackages['subpackage_2'] = status.MISSING - self.assertFalse(self.dev.is_available()) - self.package2.subpackages['subpackage_2'] = status.NOT_INSTALLED - self.assertFalse(self.dev.is_available()) - - def test_link(self): - link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') - link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') - link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') - self.package1.should_link['subpackage_1'] = link1 - self.package1.should_link['subpackage_2'] = link2 - self.package2.should_link['subpackage_2'] = link3 - - self.dev.link(self._OS_VERSION) - self.assertEqual(self.package1.subpackage_status( - 'subpackage_1', link1)[0], status.LINKED) - self.assertEqual(self.package1.subpackage_status( - 'subpackage_2', link2)[0], status.LINKED) - self.assertEqual(self.package2.subpackage_status( - 'subpackage_2', link3)[0], status.LINKED) - - def test_link_fail(self): - link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') - link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') - link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') - self.package1.should_link['subpackage_1'] = link1 - # Gonna be problems with package1.subpackage_2 - self.package2.should_link['subpackage_2'] = link3 - - with self.assertRaises(device.PackageLinkError): - self.dev.link(self._OS_VERSION) - # Should still link what it can. - self.assertEqual(self.package1.subpackage_status( - 'subpackage_1', link1)[0], status.LINKED) - self.assertNotEqual(self.package1.subpackage_status( - 'subpackage_2', link2)[0], status.LINKED) - self.assertEqual(self.package2.subpackage_status( - 'subpackage_2', link3)[0], status.LINKED) - - def test_link_not_downloaded(self): - link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') - link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') - link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') - self.package1.should_link['subpackage_1'] = link1 - self.package1.should_link['subpackage_2'] = link2 - self.package2.should_link['subpackage_2'] = link3 - self.package2.downloaded = False - - with self.assertRaises(device.PackageLinkError): - self.dev.link(self._OS_VERSION) - # Should still link what it can. - self.assertEqual(self.package1.subpackage_status( - 'subpackage_1', link1)[0], status.LINKED) - self.assertEqual(self.package1.subpackage_status( - 'subpackage_2', link2)[0], status.LINKED) - self.assertNotEqual(self.package2.subpackage_status( - 'subpackage_2', link3)[0], status.LINKED) - - def test_link_overwrite(self): - link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') - link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') - link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') - self.package1.should_link['subpackage_1'] = link1 - self.stub_os.path.should_exist = [link2] - self.package2.should_link['subpackage_2'] = link3 - - # Shouldn't raise, but shouldn't link package1.subpackage_2. - self.dev.link(self._OS_VERSION) - self.assertEqual(self.package1.subpackage_status( - 'subpackage_1', link1)[0], status.LINKED) - self.assertNotEqual(self.package1.subpackage_status( - 'subpackage_2', link2)[0], status.LINKED) - self.assertEqual(self.package2.subpackage_status( - 'subpackage_2', link3)[0], status.LINKED) - - def test_unlink(self): - link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') - link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') - link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') - self.package1.subpackages['subpackage_1'] = status.LINKED - self.package1.subpackages['subpackage_2'] = status.LINKED - self.package2.subpackages['subpackage_2'] = status.LINKED - self.package1.should_unlink['subpackage_1'] = link1 - self.package1.should_unlink['subpackage_2'] = link2 - self.package2.should_unlink['subpackage_2'] = link3 - - self.dev.unlink(self._OS_VERSION) - self.assertEqual(self.package1.subpackage_status( - 'subpackage_1', link1)[0], status.INSTALLED) - self.assertEqual(self.package1.subpackage_status( - 'subpackage_2', link2)[0], status.INSTALLED) - self.assertEqual(self.package2.subpackage_status( - 'subpackage_2', link3)[0], status.INSTALLED) - - def test_partial_unlink(self): - link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') - link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') - link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') - self.package1.subpackages['subpackage_1'] = status.MISSING - self.package1.subpackages['subpackage_2'] = status.INSTALLED - self.package2.subpackages['subpackage_2'] = status.LINKED - self.package1.should_unlink['subpackage_1'] = link1 - self.package1.should_unlink['subpackage_2'] = link2 - self.package2.should_unlink['subpackage_2'] = link3 - - # Should only actually unlink package2.subpackage_2. - self.dev.unlink(self._OS_VERSION) - self.assertEqual(self.package1.subpackage_status( - 'subpackage_1', link1)[0], status.MISSING) - self.assertEqual(self.package1.subpackage_status( - 'subpackage_2', link2)[0], status.INSTALLED) - self.assertEqual(self.package2.subpackage_status( - 'subpackage_2', link3)[0], status.INSTALLED) - - def test_failed_unlink(self): - link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') - link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') - link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') - self.package2.subpackages['subpackage_2'] = status.LINKED - - # Should fail to unlink package2.subpackage_2. - with self.assertRaises(device.PackageUnlinkError): - self.dev.unlink(self._OS_VERSION) - self.assertEqual(self.package1.subpackage_status( - 'subpackage_1', link1)[0], status.INSTALLED) - self.assertEqual(self.package1.subpackage_status( - 'subpackage_2', link2)[0], status.INSTALLED) - self.assertEqual(self.package2.subpackage_status( - 'subpackage_2', link3)[0], status.LINKED) - - def test_linked(self): - link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') - link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') - link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') - self.package1.should_link['subpackage_1'] = link1 - self.package1.should_link['subpackage_2'] = link2 - self.package2.should_link['subpackage_2'] = link3 - self.package1.should_unlink['subpackage_1'] = link1 - self.package1.should_unlink['subpackage_2'] = link2 - self.package2.should_unlink['subpackage_2'] = link3 - - with self.dev.linked(self._OS_VERSION): - # In context, should be linked. - self.assertEqual(self.package1.subpackage_status( - 'subpackage_1', link1)[0], status.LINKED) - self.assertEqual(self.package1.subpackage_status( - 'subpackage_2', link2)[0], status.LINKED) - self.assertEqual(self.package2.subpackage_status( - 'subpackage_2', link3)[0], status.LINKED) - - # Out of context, no longer linked. - self.assertEqual(self.package1.subpackage_status( - 'subpackage_1', link1)[0], status.INSTALLED) - self.assertEqual(self.package1.subpackage_status( - 'subpackage_2', link2)[0], status.INSTALLED) - self.assertEqual(self.package2.subpackage_status( - 'subpackage_2', link3)[0], status.INSTALLED) - - def test_unrecognized_paths(self): - self.package1.subpackages['subpackage_1'] = status.NOT_INSTALLED - self.package1.subpackages['subpackage_2'] = status.UNRECOGNIZED - self.package2.subpackages['subpackage_2'] = status.UNRECOGNIZED - result = self.dev.unrecognized_paths(self._OS_VERSION) - self.assertIn(self.stub_util.GetOSPath(self._OS_VERSION, 'path2'), result) - self.assertIn(self.stub_util.GetOSPath(self._OS_VERSION, 'path3'), result) - self.assertNotIn(self.stub_util.GetOSPath(self._OS_VERSION, 'path1'), - result) - - def test_match_tarball(self): - matching = package.TarPackage('match', None, - self.stub_tar_package.tarball_version, None) - non_matching_tar = package_stub.StubPackage( - 'non_match', None, '<wrong_version>', None) - # While unlikely git and tar hashes would match, it is in theory possible. - # Luckily the branch:hash formulation for git versions should prevent this - # issue. - non_matching_git = package_stub.StubPackage( - 'git', None, '<branch>:' + self.stub_tar_package.tarball_version, None) - dev = device.Device('name', 'vend', 'arch', { - non_matching_tar: {}, matching: {}, non_matching_git: {}}) - self.assertEqual(dev.match_tarball('file1'), 'match') - - def test_non_matching_tarball(self): - non_matching_tar = package.TarPackage( - 'non_match', None, '<wrong_version>', None) - # While unlikely git and tar hashes would match, it is in theory possible. - # Luckily the branch:hash formulation for git versions should prevent this - # issue. - non_matching_git = package.GitPackage( - 'git', None, '<branch>:<correct_version>', None) - self.stub_open.files = {'file1': stubs.StubFile('file1')} - self.stub_os.path.should_exist = ['file1'] - self.stub_hashlib.should_return = '<correct_version>' - dev = device.Device('name', 'vend', 'arch', - {non_matching_tar: {}, non_matching_git: {}}) - self.assertEqual(dev.match_tarball('file1'), None) - - def test_install(self): - self.package1.downloaded = False - self.package2.downloaded = False - self.package1.should_download = True - self.package2.should_download = True - - self.assertFalse(self.dev.is_available()) - self.dev.install(auto_accept=True) - self.assertTrue(self.dev.is_available()) - - def test_partial_install_with_link(self): - self.package1.downloaded = True - self.package2.downloaded = False - link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') - link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') - link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') - - self.package1.should_link['subpackage_1'] = link1 - self.package1.should_link['subpackage_2'] = link2 - self.package2.should_link['subpackage_2'] = link3 - # Package 1 is already downloaded. But it will still link. - self.package1.should_download = False - self.package2.should_download = True - - self.assertFalse(self.dev.is_available()) - self.dev.install(auto_accept=True, link_os_version=self._OS_VERSION) - self.assertTrue(self.dev.is_available()) - self.assertEqual(self.package1.subpackage_status( - 'subpackage_1', self._OS_VERSION)[0], status.LINKED) - self.assertEqual(self.package1.subpackage_status( - 'subpackage_2', self._OS_VERSION)[0], status.LINKED) - self.assertEqual(self.package2.subpackage_status( - 'subpackage_2', self._OS_VERSION)[0], status.LINKED) - - def test_failed_install(self): - self.package1.downloaded = False - self.package2.downloaded = False - self.package1.should_download = True - self.package2.should_download = False - # Since it will have downloaded successfully, it should be cleaned up. - self.package1.should_uninstall = True - - self.assertFalse(self.dev.is_available()) - with self.assertRaises(device.PackageDownloadError): - self.dev.install(auto_accept=True) - self.assertFalse(self.dev.is_available()) - self.assertFalse(self.package1.is_downloaded()) - self.assertFalse(self.package2.is_downloaded()) - - def test_failed_partial_install(self): - self.package1.downloaded = True - self.package2.downloaded = False - self.package1.should_download = False - self.package2.should_download = False - - self.assertFalse(self.dev.is_available()) - with self.assertRaises(device.PackageDownloadError): - self.dev.install(auto_accept=True) - self.assertFalse(self.dev.is_available()) - # Package1 was already available; it shouldn't go away just because - # Package2 failed. - self.assertTrue(self.package1.is_downloaded()) - self.assertFalse(self.package2.is_downloaded()) - - def test_void_install(self): - self.package1.downloaded = True - self.package2.downloaded = True - self.package1.should_download = False - self.package2.should_download = False - self.package1.licenses = [] - - self.assertTrue(self.dev.is_available()) - self.dev.install(auto_accept=False) - self.assertTrue(self.dev.is_available()) - self.assertTrue(self.package1.is_downloaded()) - self.assertTrue(self.package2.is_downloaded()) - - def test_install_failed_link(self): - self.package1.downloaded = False - self.package2.downloaded = False - self.package1.should_download = True - self.package2.should_download = True - - self.assertFalse(self.dev.is_available()) - with self.assertRaises(device.PackageLinkError): - self.dev.install(auto_accept=True, link_os_version=self._OS_VERSION) - # Should still install, just won't be linked. - self.assertTrue(self.dev.is_available()) - self.assertNotEqual(self.package1.subpackage_status('subpackage_1')[0], - status.LINKED) - self.assertNotEqual(self.package1.subpackage_status('subpackage_2')[0], - status.LINKED) - self.assertNotEqual(self.package2.subpackage_status('subpackage_2')[0], - status.LINKED) - - def test_uninstall(self): - self.package1.downloaded = True - self.package2.downloaded = True - self.package1.should_uninstall = True - self.package2.should_uninstall = True - self.assertTrue(self.dev.is_available()) - self.dev.uninstall() - self.assertFalse(self.dev.is_available()) - self.assertFalse(self.package1.is_downloaded()) - self.assertFalse(self.package2.is_downloaded()) - - def test_uninstall_unnecessary(self): - self.package1.downloaded = False - self.package2.downloaded = True - # Still ok to try uninstalling package1. - self.package1.should_uninstall = True - self.package2.should_uninstall = True - self.assertFalse(self.dev.is_available()) - self.dev.uninstall() - self.assertFalse(self.dev.is_available()) - self.assertFalse(self.package1.is_downloaded()) - self.assertFalse(self.package2.is_downloaded()) + self.package1 = package_stub.StubPackage( + 'package_1', + subpackages={'subpackage_1': status.INSTALLED, + 'subpackage_2': status.INSTALLED}, + licenses=['license1', 'license2']) + self.package2 = package_stub.StubPackage( + 'package_2', subpackages={'subpackage_2': status.INSTALLED}) + self.packages = {'package_1': self.package1, 'package_2': self.package2} + + self.stub_os = stubs.StubOs() + self.stub_open = stubs.StubOpen(self.stub_os) + self.stub_hashlib = stubs.StubHashlib() + self.stub_tar_package = package_stub.StubTarPackage('tarball_version') + self.stub_util = util_stub.StubUtil(os_version=self._OS_VERSION) + + device.os = self.stub_os + device.open = self.stub_open.open + device.hashlib = self.stub_hashlib + device.util = self.stub_util + device.package = package_stub.StubPackageModule(self.stub_tar_package) + + self.dev = device.Device( + 'test_device', 'test_vendor', 'test_arch', + {self.package1: {'subpackage_1': 'path1', 'subpackage_2': 'path2'}, + self.package2: {'subpackage_2': 'path3'}}) + + def test_from_dict(self): + dev = device.Device.from_dict(self.dev_json, self.packages) + self.assertIsInstance(dev, device.Device) + # pylint: disable=protected-access + self.assertTrue(self.package1 in dev._package_map) + self.assertEqual(dev._package_map[self.package1]['subpackage_1'], + 'path/to/link') + + def test_from_dict_bad_package(self): + self.dev_json['packages']['nonexistent'] = {} + with self.assertRaisesRegexp(ValueError, + 'Package.*nonexistent.*does not exist'): + device.Device.from_dict(self.dev_json, self.packages) + + def test_from_dict_bad_subpackage(self): + self.dev_json['packages']['package_1']['nonexistent'] = {} + with self.assertRaisesRegexp(ValueError, + 'Sub.*nonexistent.*does not exist'): + device.Device.from_dict(self.dev_json, self.packages) + + def test_from_dict_missing_keys(self): + del self.dev_json['arch'] + with self.assertRaisesRegexp(KeyError, 'arch'): + device.Device.from_dict(self.dev_json, self.packages) + + self.dev_json['arch'] = 'test_arch' + del self.dev_json['vendor'] + with self.assertRaisesRegexp(KeyError, 'vendor'): + device.Device.from_dict(self.dev_json, self.packages) + + self.dev_json['vendor'] = 'test_vendor' + del self.dev_json['packages'] + with self.assertRaisesRegexp(KeyError, 'packages'): + device.Device.from_dict(self.dev_json, self.packages) + + def test_status_installed(self): + self.assertEqual(self.dev.status(self._OS_VERSION)[0], status.INSTALLED) + self.assertEqual(self.dev.status()[0], status.INSTALLED) + + def test_status_linked(self): + self.package1.subpackages['subpackage_1'] = status.LINKED + self.assertEqual(self.dev.status(self._OS_VERSION)[0], status.LINKED) + self.assertEqual(self.dev.status()[0], status.INSTALLED) + + def test_status_unrecognized(self): + self.package1.subpackages['subpackage_1'] = status.LINKED + self.package1.subpackages['subpackage_2'] = status.UNRECOGNIZED + # Also test that this beats out linked. + self.assertEqual(self.dev.status(self._OS_VERSION)[0], + status.UNRECOGNIZED) + # In certain situations, this could also be missing, but that's more + # complex to code into stubs. + self.assertEqual(self.dev.status()[0], status.INSTALLED) + + def test_status_missing(self): + self.package1.subpackages['subpackage_1'] = status.LINKED + self.package1.subpackages['subpackage_2'] = status.UNRECOGNIZED + self.package2.subpackages['subpackage_2'] = status.MISSING + # Also test that this beats out linked and unrecognized. + self.assertEqual(self.dev.status()[0], status.MISSING) + self.assertEqual(self.dev.status(self._OS_VERSION)[0], status.MISSING) + + + def test_status_not_installed(self): + self.package1.subpackages['subpackage_1'] = status.NOT_INSTALLED + self.package1.subpackages['subpackage_2'] = status.UNRECOGNIZED + self.package2.subpackages['subpackage_2'] = status.MISSING + # Also test that this beats out missing and unrecognized + # (and transitively linked). + self.assertEqual(self.dev.status()[0], status.NOT_INSTALLED) + self.assertEqual(self.dev.status(self._OS_VERSION)[0], + status.NOT_INSTALLED) + + def test_is_available(self): + self.assertTrue(self.dev.is_available()) + self.package2.subpackages['subpackage_2'] = status.UNRECOGNIZED + self.assertTrue(self.dev.is_available()) + self.package2.subpackages['subpackage_2'] = status.LINKED + self.assertTrue(self.dev.is_available()) + self.package2.subpackages['subpackage_2'] = status.MISSING + self.assertFalse(self.dev.is_available()) + self.package2.subpackages['subpackage_2'] = status.NOT_INSTALLED + self.assertFalse(self.dev.is_available()) + + def test_link(self): + link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') + link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') + link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') + self.package1.should_link['subpackage_1'] = link1 + self.package1.should_link['subpackage_2'] = link2 + self.package2.should_link['subpackage_2'] = link3 + + self.dev.link(self._OS_VERSION) + self.assertEqual(self.package1.subpackage_status( + 'subpackage_1', link1)[0], status.LINKED) + self.assertEqual(self.package1.subpackage_status( + 'subpackage_2', link2)[0], status.LINKED) + self.assertEqual(self.package2.subpackage_status( + 'subpackage_2', link3)[0], status.LINKED) + + def test_link_fail(self): + link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') + link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') + link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') + self.package1.should_link['subpackage_1'] = link1 + # Gonna be problems with package1.subpackage_2 + self.package2.should_link['subpackage_2'] = link3 + + with self.assertRaises(device.PackageLinkError): + self.dev.link(self._OS_VERSION) + # Should still link what it can. + self.assertEqual(self.package1.subpackage_status( + 'subpackage_1', link1)[0], status.LINKED) + self.assertNotEqual(self.package1.subpackage_status( + 'subpackage_2', link2)[0], status.LINKED) + self.assertEqual(self.package2.subpackage_status( + 'subpackage_2', link3)[0], status.LINKED) + + def test_link_not_downloaded(self): + link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') + link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') + link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') + self.package1.should_link['subpackage_1'] = link1 + self.package1.should_link['subpackage_2'] = link2 + self.package2.should_link['subpackage_2'] = link3 + self.package2.downloaded = False + + with self.assertRaises(device.PackageLinkError): + self.dev.link(self._OS_VERSION) + # Should still link what it can. + self.assertEqual(self.package1.subpackage_status( + 'subpackage_1', link1)[0], status.LINKED) + self.assertEqual(self.package1.subpackage_status( + 'subpackage_2', link2)[0], status.LINKED) + self.assertNotEqual(self.package2.subpackage_status( + 'subpackage_2', link3)[0], status.LINKED) + + def test_link_overwrite(self): + link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') + link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') + link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') + self.package1.should_link['subpackage_1'] = link1 + self.stub_os.path.should_exist = [link2] + self.package2.should_link['subpackage_2'] = link3 + + # Shouldn't raise, but shouldn't link package1.subpackage_2. + self.dev.link(self._OS_VERSION) + self.assertEqual(self.package1.subpackage_status( + 'subpackage_1', link1)[0], status.LINKED) + self.assertNotEqual(self.package1.subpackage_status( + 'subpackage_2', link2)[0], status.LINKED) + self.assertEqual(self.package2.subpackage_status( + 'subpackage_2', link3)[0], status.LINKED) + + def test_unlink(self): + link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') + link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') + link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') + self.package1.subpackages['subpackage_1'] = status.LINKED + self.package1.subpackages['subpackage_2'] = status.LINKED + self.package2.subpackages['subpackage_2'] = status.LINKED + self.package1.should_unlink['subpackage_1'] = link1 + self.package1.should_unlink['subpackage_2'] = link2 + self.package2.should_unlink['subpackage_2'] = link3 + + self.dev.unlink(self._OS_VERSION) + self.assertEqual(self.package1.subpackage_status( + 'subpackage_1', link1)[0], status.INSTALLED) + self.assertEqual(self.package1.subpackage_status( + 'subpackage_2', link2)[0], status.INSTALLED) + self.assertEqual(self.package2.subpackage_status( + 'subpackage_2', link3)[0], status.INSTALLED) + + def test_partial_unlink(self): + link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') + link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') + link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') + self.package1.subpackages['subpackage_1'] = status.MISSING + self.package1.subpackages['subpackage_2'] = status.INSTALLED + self.package2.subpackages['subpackage_2'] = status.LINKED + self.package1.should_unlink['subpackage_1'] = link1 + self.package1.should_unlink['subpackage_2'] = link2 + self.package2.should_unlink['subpackage_2'] = link3 + + # Should only actually unlink package2.subpackage_2. + self.dev.unlink(self._OS_VERSION) + self.assertEqual(self.package1.subpackage_status( + 'subpackage_1', link1)[0], status.MISSING) + self.assertEqual(self.package1.subpackage_status( + 'subpackage_2', link2)[0], status.INSTALLED) + self.assertEqual(self.package2.subpackage_status( + 'subpackage_2', link3)[0], status.INSTALLED) + + def test_failed_unlink(self): + link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') + link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') + link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') + self.package2.subpackages['subpackage_2'] = status.LINKED + + # Should fail to unlink package2.subpackage_2. + with self.assertRaises(device.PackageUnlinkError): + self.dev.unlink(self._OS_VERSION) + self.assertEqual(self.package1.subpackage_status( + 'subpackage_1', link1)[0], status.INSTALLED) + self.assertEqual(self.package1.subpackage_status( + 'subpackage_2', link2)[0], status.INSTALLED) + self.assertEqual(self.package2.subpackage_status( + 'subpackage_2', link3)[0], status.LINKED) + + def test_linked(self): + link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') + link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') + link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') + self.package1.should_link['subpackage_1'] = link1 + self.package1.should_link['subpackage_2'] = link2 + self.package2.should_link['subpackage_2'] = link3 + self.package1.should_unlink['subpackage_1'] = link1 + self.package1.should_unlink['subpackage_2'] = link2 + self.package2.should_unlink['subpackage_2'] = link3 + + with self.dev.linked(self._OS_VERSION): + # In context, should be linked. + self.assertEqual(self.package1.subpackage_status( + 'subpackage_1', link1)[0], status.LINKED) + self.assertEqual(self.package1.subpackage_status( + 'subpackage_2', link2)[0], status.LINKED) + self.assertEqual(self.package2.subpackage_status( + 'subpackage_2', link3)[0], status.LINKED) + + # Out of context, no longer linked. + self.assertEqual(self.package1.subpackage_status( + 'subpackage_1', link1)[0], status.INSTALLED) + self.assertEqual(self.package1.subpackage_status( + 'subpackage_2', link2)[0], status.INSTALLED) + self.assertEqual(self.package2.subpackage_status( + 'subpackage_2', link3)[0], status.INSTALLED) + + def test_unrecognized_paths(self): + self.package1.subpackages['subpackage_1'] = status.NOT_INSTALLED + self.package1.subpackages['subpackage_2'] = status.UNRECOGNIZED + self.package2.subpackages['subpackage_2'] = status.UNRECOGNIZED + result = self.dev.unrecognized_paths(self._OS_VERSION) + self.assertIn(self.stub_util.GetOSPath(self._OS_VERSION, 'path2'), + result) + self.assertIn(self.stub_util.GetOSPath(self._OS_VERSION, 'path3'), + result) + self.assertNotIn(self.stub_util.GetOSPath(self._OS_VERSION, 'path1'), + result) + + def test_match_tarball(self): + matching = package.TarPackage('match', None, + self.stub_tar_package.tarball_version, + None) + non_matching_tar = package_stub.StubPackage( + 'non_match', None, '<wrong_version>', None) + # While unlikely git and tar hashes would match, it is in theory + # possible. Luckily the branch:hash formulation for git versions + # should prevent this issue. + non_matching_git = package_stub.StubPackage( + 'git', None, '<branch>:' + self.stub_tar_package.tarball_version, + None) + dev = device.Device('name', 'vend', 'arch', { + non_matching_tar: {}, matching: {}, non_matching_git: {}}) + self.assertEqual(dev.match_tarball('file1'), 'match') + + def test_non_matching_tarball(self): + non_matching_tar = package.TarPackage( + 'non_match', None, '<wrong_version>', None) + # While unlikely git and tar hashes would match, it is in theory + # possible. Luckily the branch:hash formulation for git versions + # should prevent this issue. + non_matching_git = package.GitPackage( + 'git', None, '<branch>:<correct_version>', None) + self.stub_open.files = {'file1': stubs.StubFile('file1')} + self.stub_os.path.should_exist = ['file1'] + self.stub_hashlib.should_return = '<correct_version>' + dev = device.Device('name', 'vend', 'arch', + {non_matching_tar: {}, non_matching_git: {}}) + self.assertEqual(dev.match_tarball('file1'), None) + + def test_install(self): + self.package1.downloaded = False + self.package2.downloaded = False + self.package1.should_download = True + self.package2.should_download = True + + self.assertFalse(self.dev.is_available()) + self.dev.install(auto_accept=True) + self.assertTrue(self.dev.is_available()) + + def test_partial_install_with_link(self): + self.package1.downloaded = True + self.package2.downloaded = False + link1 = self.stub_util.GetOSPath(self._OS_VERSION, 'path1') + link2 = self.stub_util.GetOSPath(self._OS_VERSION, 'path2') + link3 = self.stub_util.GetOSPath(self._OS_VERSION, 'path3') + + self.package1.should_link['subpackage_1'] = link1 + self.package1.should_link['subpackage_2'] = link2 + self.package2.should_link['subpackage_2'] = link3 + # Package 1 is already downloaded. But it will still link. + self.package1.should_download = False + self.package2.should_download = True + + self.assertFalse(self.dev.is_available()) + self.dev.install(auto_accept=True, link_os_version=self._OS_VERSION) + self.assertTrue(self.dev.is_available()) + self.assertEqual(self.package1.subpackage_status( + 'subpackage_1', self._OS_VERSION)[0], status.LINKED) + self.assertEqual(self.package1.subpackage_status( + 'subpackage_2', self._OS_VERSION)[0], status.LINKED) + self.assertEqual(self.package2.subpackage_status( + 'subpackage_2', self._OS_VERSION)[0], status.LINKED) + + def test_failed_install(self): + self.package1.downloaded = False + self.package2.downloaded = False + self.package1.should_download = True + self.package2.should_download = False + # Since it will have downloaded successfully, it should be cleaned up. + self.package1.should_uninstall = True + + self.assertFalse(self.dev.is_available()) + with self.assertRaises(device.PackageDownloadError): + self.dev.install(auto_accept=True) + self.assertFalse(self.dev.is_available()) + self.assertFalse(self.package1.is_downloaded()) + self.assertFalse(self.package2.is_downloaded()) + + def test_failed_partial_install(self): + self.package1.downloaded = True + self.package2.downloaded = False + self.package1.should_download = False + self.package2.should_download = False + + self.assertFalse(self.dev.is_available()) + with self.assertRaises(device.PackageDownloadError): + self.dev.install(auto_accept=True) + self.assertFalse(self.dev.is_available()) + # Package1 was already available; it shouldn't go away just because + # Package2 failed. + self.assertTrue(self.package1.is_downloaded()) + self.assertFalse(self.package2.is_downloaded()) + + def test_void_install(self): + self.package1.downloaded = True + self.package2.downloaded = True + self.package1.should_download = False + self.package2.should_download = False + self.package1.licenses = [] + + self.assertTrue(self.dev.is_available()) + self.dev.install(auto_accept=False) + self.assertTrue(self.dev.is_available()) + self.assertTrue(self.package1.is_downloaded()) + self.assertTrue(self.package2.is_downloaded()) + + def test_install_failed_link(self): + self.package1.downloaded = False + self.package2.downloaded = False + self.package1.should_download = True + self.package2.should_download = True + + self.assertFalse(self.dev.is_available()) + with self.assertRaises(device.PackageLinkError): + self.dev.install(auto_accept=True, link_os_version=self._OS_VERSION) + # Should still install, just won't be linked. + self.assertTrue(self.dev.is_available()) + self.assertNotEqual(self.package1.subpackage_status('subpackage_1')[0], + status.LINKED) + self.assertNotEqual(self.package1.subpackage_status('subpackage_2')[0], + status.LINKED) + self.assertNotEqual(self.package2.subpackage_status('subpackage_2')[0], + status.LINKED) + + def test_uninstall(self): + self.package1.downloaded = True + self.package2.downloaded = True + self.package1.should_uninstall = True + self.package2.should_uninstall = True + self.assertTrue(self.dev.is_available()) + self.dev.uninstall() + self.assertFalse(self.dev.is_available()) + self.assertFalse(self.package1.is_downloaded()) + self.assertFalse(self.package2.is_downloaded()) + + def test_uninstall_unnecessary(self): + self.package1.downloaded = False + self.package2.downloaded = True + # Still ok to try uninstalling package1. + self.package1.should_uninstall = True + self.package2.should_uninstall = True + self.assertFalse(self.dev.is_available()) + self.dev.uninstall() + self.assertFalse(self.dev.is_available()) + self.assertFalse(self.package1.is_downloaded()) + self.assertFalse(self.package2.is_downloaded()) diff --git a/cli/lib/bsp/manifest.py b/cli/lib/bsp/manifest.py index 1dd6d12..f7de742 100644 --- a/cli/lib/bsp/manifest.py +++ b/cli/lib/bsp/manifest.py @@ -32,108 +32,110 @@ MANIFEST_KEY_DEVICES = 'devices' class Error(error.Error): - """General failure.""" + """General failure.""" class UnknownBspError(Error): - """Raised when a requested BSP is not recognized.""" - description = 'Unrecognized BSP name' + """Raised when a requested BSP is not recognized.""" + description = 'Unrecognized BSP name' class Manifest(object): - """Class for Manifests. + """Class for Manifests. - Manifests are just dumb data. + Manifests are just dumb data. - Attributes: - packages: a dictionary of packages ({package_name : Package}). - devices: a dictionary of devices ({short_name : Device}). - """ - - def __init__(self, packages, devices): - self.packages = packages - self.devices = devices - - @classmethod - def from_dict(cls, dic): - """Create a Manifest from a dict. - - Args: - dic: the dictionary to build the Manifest from. - - Returns: - A Manifest created from |dic|. - """ - packages = {} - devices = {} - for (package_name, package_spec) in dic[MANIFEST_KEY_PACKAGES].iteritems(): - try: - packages[package_name] = package.Package.from_dict(package_spec, - package_name) - except KeyError as e: - raise KeyError('package {}: {}'.format(package_name, e)) - - for (short_name, device_spec) in dic[MANIFEST_KEY_DEVICES].iteritems(): - try: - devices[short_name] = device.Device.from_dict(device_spec, - packages) - except KeyError as e: - raise KeyError('device {}: {}'.format(short_name, e)) - - return cls(packages, devices) - - @classmethod - def from_json(cls, manifest_file=DEFAULT_MANIFEST_FILE): - """Reads in a manifest from a json file. - - Args: - manifest_file: the path to the file containing the manifest json. - - Returns: - A manifest.Manifest object based on the file passed in. - - Raises: - IOError: if there are issues opening the file. - ValueError: if the specified file does not parse as valid json, - or is not a valid manifest for reasons other than KeyErrors. - KeyError: if a required manifest key is missing. - """ - - manifest_dic = {} - try: - with open(manifest_file) as f: - manifest_dic = json.load(f) - except IOError as e: - raise IOError('Unable to open bsp manifest file {}: {}.'.format( - manifest_file, e)) - except ValueError as e: - raise ValueError('Could not parse json in ' - 'bsp manifest file {}: {}'.format(manifest_file, e)) - - try: - result = cls.from_dict(manifest_dic) - except KeyError as e: - raise KeyError('Missing value in bsp manifest file {}: {}.'.format( - manifest_file, e)) - except ValueError as e: - raise ValueError('Error in bsp manifest file {}: {}.'.format( - manifest_file, e)) - - return result - - def is_bsp_available(self, bsp): - """Checks that the requested BSP is available for building. - - Args: - bsp: BSP name. - - Returns: - True if the BSP is available, False otherwise. - - Raises: - UnknownBspError: The given BSP name is not a known BSP. + Attributes: + packages: a dictionary of packages ({package_name : Package}). + devices: a dictionary of devices ({short_name : Device}). """ - if bsp not in self.devices: - raise UnknownBspError(bsp) - return self.devices[bsp].is_available() + def __init__(self, packages, devices): + self.packages = packages + self.devices = devices + + @classmethod + def from_dict(cls, dic): + """Create a Manifest from a dict. + + Args: + dic: the dictionary to build the Manifest from. + + Returns: + A Manifest created from |dic|. + """ + packages = {} + devices = {} + for (package_name, package_spec) in ( + dic[MANIFEST_KEY_PACKAGES].iteritems()): + try: + packages[package_name] = package.Package.from_dict(package_spec, + package_name) + except KeyError as e: + raise KeyError('package {}: {}'.format(package_name, e)) + + for (short_name, device_spec) in dic[MANIFEST_KEY_DEVICES].iteritems(): + try: + devices[short_name] = device.Device.from_dict(device_spec, + packages) + except KeyError as e: + raise KeyError('device {}: {}'.format(short_name, e)) + + return cls(packages, devices) + + @classmethod + def from_json(cls, manifest_file=DEFAULT_MANIFEST_FILE): + """Reads in a manifest from a json file. + + Args: + manifest_file: the path to the file containing the manifest json. + + Returns: + A manifest.Manifest object based on the file passed in. + + Raises: + IOError: if there are issues opening the file. + ValueError: if the specified file does not parse as valid json, + or is not a valid manifest for reasons other than KeyErrors. + KeyError: if a required manifest key is missing. + """ + + manifest_dic = {} + try: + with open(manifest_file) as f: + manifest_dic = json.load(f) + except IOError as e: + raise IOError('Unable to open bsp manifest file {}: {}.'.format( + manifest_file, e)) + except ValueError as e: + raise ValueError('Could not parse json in ' + 'bsp manifest file {}: {}'.format(manifest_file, + e)) + + try: + result = cls.from_dict(manifest_dic) + except KeyError as e: + raise KeyError('Missing value in bsp manifest file {}: {}.'.format( + manifest_file, e)) + except ValueError as e: + raise ValueError('Error in bsp manifest file {}: {}.'.format( + manifest_file, e)) + + return result + + def is_bsp_available(self, bsp): + """Checks that the requested BSP is available for building. + + Args: + bsp: BSP name. + + Returns: + True if the BSP is available, False otherwise. + + Raises: + UnknownBspError: The given BSP name is not a known BSP. + """ + if bsp not in self.devices: + raise UnknownBspError(bsp) + + return self.devices[bsp].is_available() diff --git a/cli/lib/bsp/manifest_stub.py b/cli/lib/bsp/manifest_stub.py index ac56902..ed4e50f 100644 --- a/cli/lib/bsp/manifest_stub.py +++ b/cli/lib/bsp/manifest_stub.py @@ -23,37 +23,37 @@ import error class StubManifest(object): - from_json_devices = {} - - def __init__(self, devices): - """devices should be {name: (device, available)}""" - self.devices = {name: device for - (name, (device, _)) in - devices.iteritems()} - self._available = {name: available - for (name, (_, available)) in - devices.iteritems()} - - def is_bsp_available(self, device): - if not device in self.devices: - raise StubManifestGenerator.UnknownBspError( - 'No such device {}, options are {}'.format( - device, self.devices.keys())) - return self._available[device] - - # pylint: disable=unused-argument - @classmethod - def from_json(cls, manifest_file=None, os_path=None): - return cls(cls.from_json_devices) + from_json_devices = {} + + def __init__(self, devices): + """devices should be {name: (device, available)}""" + self.devices = {name: device + for (name, (device, _)) + in devices.iteritems()} + self._available = {name: available + for (name, (_, available)) + in devices.iteritems()} + + def is_bsp_available(self, device): + if not device in self.devices: + raise StubManifestGenerator.UnknownBspError( + 'No such device {}, options are {}'.format( + device, self.devices.keys())) + return self._available[device] + + # pylint: disable=unused-argument + @classmethod + def from_json(cls, manifest_file=None, os_path=None): + return cls(cls.from_json_devices) class StubManifestGenerator(object): - class Error(error.Error): - pass + class Error(error.Error): + pass - class UnknownBspError(Error): - pass + class UnknownBspError(Error): + pass - def __init__(self): - self.Manifest = StubManifest + def __init__(self): + self.Manifest = StubManifest diff --git a/cli/lib/bsp/manifest_unittest.py b/cli/lib/bsp/manifest_unittest.py index a213bad..dca63d9 100644 --- a/cli/lib/bsp/manifest_unittest.py +++ b/cli/lib/bsp/manifest_unittest.py @@ -24,128 +24,128 @@ from bsp import manifest from test import stubs class ManifestTest(unittest.TestCase): - def setUp(self): - self.stub_os = stubs.StubOs() - self.stub_open = stubs.StubOpen(self.stub_os) - - self.base_manifest = { - 'packages': { - 'package_1': { - 'package_type': 'git', - 'remote': 'remote', - 'version': 'version', - 'subpackages': { - 'subpackage_1': { - 'subdir': '.', - 'licenses': ['license1', 'license2'] + def setUp(self): + self.stub_os = stubs.StubOs() + self.stub_open = stubs.StubOpen(self.stub_os) + + self.base_manifest = { + 'packages': { + 'package_1': { + 'package_type': 'git', + 'remote': 'remote', + 'version': 'version', + 'subpackages': { + 'subpackage_1': { + 'subdir': '.', + 'licenses': ['license1', 'license2'] + } } } - } - }, - 'devices': { - 'device_1': { - 'device_name': 'Test Device 1', - 'vendor': 'test_vendor', - 'arch': 'test_arch', - 'packages': { - 'package_1': { - 'subpackage_1': 'path/to/extract' + }, + 'devices': { + 'device_1': { + 'device_name': 'Test Device 1', + 'vendor': 'test_vendor', + 'arch': 'test_arch', + 'packages': { + 'package_1': { + 'subpackage_1': 'path/to/extract' + } } } } } - } - - manifest.os = self.stub_os - manifest.open = self.stub_open.open - - self.manifest_path = 'manifest' - self.stub_os.path.should_exist.append(self.manifest_path) - - def test_from_dict(self): - man = manifest.Manifest.from_dict(self.base_manifest) - self.assertIsInstance(man, manifest.Manifest) - - def test_from_dict_incomplete_devices(self): - incomplete_manifest = self.base_manifest - incomplete_manifest['devices']['device_2'] = {} - with self.assertRaisesRegexp(KeyError, 'device_2'): - manifest.Manifest.from_dict(incomplete_manifest) - - def test_from_dict_incomplete_packages(self): - incomplete_manifest = self.base_manifest - incomplete_manifest['packages']['package_2'] = {} - with self.assertRaisesRegexp(KeyError, 'package_2'): - manifest.Manifest.from_dict(incomplete_manifest) - - def test_from_dict_missing_devices(self): - incomplete_manifest = self.base_manifest - del incomplete_manifest['devices'] - with self.assertRaises(KeyError): - manifest.Manifest.from_dict(incomplete_manifest) - - def test_from_dict_missing_packages(self): - incomplete_manifest = self.base_manifest - del incomplete_manifest['packages'] - with self.assertRaises(KeyError): - manifest.Manifest.from_dict(incomplete_manifest) - - def test_minimal_manifest(self): - minimal_manifest = {'packages': {}, 'devices': {}} - man = manifest.Manifest.from_dict(minimal_manifest) - self.assertIsInstance(man, manifest.Manifest) - self.assertEqual(len(man.packages), 0) - self.assertEqual(len(man.devices), 0) - - def test_basic_json_manifest(self): - self.stub_open.files[self.manifest_path] = ( - stubs.StubFile(json.dumps(self.base_manifest))) - man = manifest.Manifest.from_json(self.manifest_path) - self.assertIsInstance(man, manifest.Manifest) - - def test_non_json_manifest(self): - self.stub_open.files[self.manifest_path] = stubs.StubFile( - '{ unmatched bracket is invalid json') - with self.assertRaisesRegexp(ValueError, 'json'): - manifest.Manifest.from_json(self.manifest_path) - - def test_incomplete_json_manifest(self): - incomplete_manifest = self.base_manifest - incomplete_manifest['devices']['device_2'] = {} - self.stub_open.files[self.manifest_path] = ( - stubs.StubFile(json.dumps(incomplete_manifest))) - # Hopefully the error indicates that the issue is somewhere in device_2 - with self.assertRaisesRegexp(KeyError, 'device_2'): - manifest.Manifest.from_json(self.manifest_path) - - def test_invalid_json_manifest(self): - invalid_manifest = self.base_manifest - invalid_manifest['devices']['device_1'][ - 'packages']['package_1']['subpackage_2'] = 'path' - self.stub_open.files[self.manifest_path] = ( - stubs.StubFile(json.dumps(invalid_manifest))) - # Error should indicate that subpackage_2 is not a defined subpackage of - # package_1. - with self.assertRaisesRegexp(ValueError, 'subpackage_2'): - manifest.Manifest.from_json(self.manifest_path) - - def test_json_manifest_not_found(self): - self.stub_os.path.should_exist = [] - # Some sort of error about the file. - with self.assertRaisesRegexp(IOError, self.manifest_path): - manifest.Manifest.from_json(self.manifest_path) - - def test_bsp_is_available(self): - # Set up the manifest with stubbed devices. - devices = {'device_1': device_stub.StubDevice(downloaded=True), - 'device_2': device_stub.StubDevice(downloaded=False)} - - man = manifest.Manifest([], devices) - - # Check that an unknown BSP name raises an error. - with self.assertRaises(manifest.UnknownBspError): - man.is_bsp_available('invalid_bsp_name') - - # Check the return values are correct. - self.assertTrue(man.is_bsp_available('device_1')) - self.assertFalse(man.is_bsp_available('device_2')) + + manifest.os = self.stub_os + manifest.open = self.stub_open.open + + self.manifest_path = 'manifest' + self.stub_os.path.should_exist.append(self.manifest_path) + + def test_from_dict(self): + man = manifest.Manifest.from_dict(self.base_manifest) + self.assertIsInstance(man, manifest.Manifest) + + def test_from_dict_incomplete_devices(self): + incomplete_manifest = self.base_manifest + incomplete_manifest['devices']['device_2'] = {} + with self.assertRaisesRegexp(KeyError, 'device_2'): + manifest.Manifest.from_dict(incomplete_manifest) + + def test_from_dict_incomplete_packages(self): + incomplete_manifest = self.base_manifest + incomplete_manifest['packages']['package_2'] = {} + with self.assertRaisesRegexp(KeyError, 'package_2'): + manifest.Manifest.from_dict(incomplete_manifest) + + def test_from_dict_missing_devices(self): + incomplete_manifest = self.base_manifest + del incomplete_manifest['devices'] + with self.assertRaises(KeyError): + manifest.Manifest.from_dict(incomplete_manifest) + + def test_from_dict_missing_packages(self): + incomplete_manifest = self.base_manifest + del incomplete_manifest['packages'] + with self.assertRaises(KeyError): + manifest.Manifest.from_dict(incomplete_manifest) + + def test_minimal_manifest(self): + minimal_manifest = {'packages': {}, 'devices': {}} + man = manifest.Manifest.from_dict(minimal_manifest) + self.assertIsInstance(man, manifest.Manifest) + self.assertEqual(len(man.packages), 0) + self.assertEqual(len(man.devices), 0) + + def test_basic_json_manifest(self): + self.stub_open.files[self.manifest_path] = ( + stubs.StubFile(json.dumps(self.base_manifest))) + man = manifest.Manifest.from_json(self.manifest_path) + self.assertIsInstance(man, manifest.Manifest) + + def test_non_json_manifest(self): + self.stub_open.files[self.manifest_path] = stubs.StubFile( + '{ unmatched bracket is invalid json') + with self.assertRaisesRegexp(ValueError, 'json'): + manifest.Manifest.from_json(self.manifest_path) + + def test_incomplete_json_manifest(self): + incomplete_manifest = self.base_manifest + incomplete_manifest['devices']['device_2'] = {} + self.stub_open.files[self.manifest_path] = ( + stubs.StubFile(json.dumps(incomplete_manifest))) + # Hopefully the error indicates that the issue is somewhere in device_2 + with self.assertRaisesRegexp(KeyError, 'device_2'): + manifest.Manifest.from_json(self.manifest_path) + + def test_invalid_json_manifest(self): + invalid_manifest = self.base_manifest + invalid_manifest['devices']['device_1'][ + 'packages']['package_1']['subpackage_2'] = 'path' + self.stub_open.files[self.manifest_path] = ( + stubs.StubFile(json.dumps(invalid_manifest))) + # Error should indicate that subpackage_2 is not a defined subpackage of + # package_1. + with self.assertRaisesRegexp(ValueError, 'subpackage_2'): + manifest.Manifest.from_json(self.manifest_path) + + def test_json_manifest_not_found(self): + self.stub_os.path.should_exist = [] + # Some sort of error about the file. + with self.assertRaisesRegexp(IOError, self.manifest_path): + manifest.Manifest.from_json(self.manifest_path) + + def test_bsp_is_available(self): + # Set up the manifest with stubbed devices. + devices = {'device_1': device_stub.StubDevice(downloaded=True), + 'device_2': device_stub.StubDevice(downloaded=False)} + + man = manifest.Manifest([], devices) + + # Check that an unknown BSP name raises an error. + with self.assertRaises(manifest.UnknownBspError): + man.is_bsp_available('invalid_bsp_name') + + # Check the return values are correct. + self.assertTrue(man.is_bsp_available('device_1')) + self.assertFalse(man.is_bsp_available('device_2')) diff --git a/cli/lib/bsp/package.py b/cli/lib/bsp/package.py index 427675e..4723833 100644 --- a/cli/lib/bsp/package.py +++ b/cli/lib/bsp/package.py @@ -26,7 +26,6 @@ from bsp import status from bsp import subpackage from core import tool from core import user_config -from core import util import error @@ -44,429 +43,440 @@ DEFAULT_TAR_FILE = 'tarball.tar.gz' class Error(error.Error): - description = 'Error in Package' + description = 'Error in Package' class DownloadError(Error): - """Raised when a Package fails to download.""" - description = 'Failed to download package' + """Raised when a Package fails to download.""" + description = 'Failed to download package' class RemoteError(DownloadError): - """Raised when there is a problem checking the remote version of a Package.""" - description = 'Trouble getting remote version' + """Raised when there is a problem checking the remote version of a + Package. + """ + description = 'Trouble getting remote version' class VersionError(DownloadError): - """Raised when the package source is not/doesn't have the correct version.""" - description = 'Failed to get correct version' + """Raised when the package source is not/doesn't have the correct + version. + """ + description = 'Failed to get correct version' class UnzipError(DownloadError): - """Raised when the downloaded tarball fails to unzip correctly.""" - description = 'Failed to unzip tarball' + """Raised when the downloaded tarball fails to unzip correctly.""" + description = 'Failed to unzip tarball' class MissingDirectoryError(Error): - """Raised when an expected directory is missing.""" - description = 'Directory is missing' + """Raised when an expected directory is missing.""" + description = 'Directory is missing' class NotDownloadedError(Error): - """Raised when a package isn't downloaded but needs to be.""" - description = 'Package is not downloaded' + """Raised when a package isn't downloaded but needs to be.""" + description = 'Package is not downloaded' class NoSuchSubpackageError(Error): - """Raised when a non-existant subpackage is requested.""" - description = 'No such subpackage' + """Raised when a non-existant subpackage is requested.""" + description = 'No such subpackage' class PathBlockedError(Error): - """Raised when a path name is taken.""" - description = 'Unknown file/dir in the way. Must be manually removed' + """Raised when a path name is taken.""" + description = 'Unknown file/dir in the way. Must be manually removed' class SubpackageLinkError(Error): - """Raised when there is trouble linking a subpackage.""" - description = 'Trouble linking subpackage' + """Raised when there is trouble linking a subpackage.""" + description = 'Trouble linking subpackage' class SubpackageUnlinkError(Error): - """Raised when there is trouble unlinking a subpackage.""" - description = 'Trouble unlinking subpackage' + """Raised when there is trouble unlinking a subpackage.""" + description = 'Trouble unlinking subpackage' License = namedtuple('License', ('name', 'path')) class Package(object): - """Class for packages. - - Attributes: - name: the name of the package. - remote: the remote location of the package. - version: the version of the package. - subpackages: A dictionary of subpackages ({ subpackage_name : Subpackage }). - directory: (read-only) the local directory for the package. - """ - - def __init__(self, name, remote, version, subpackages): - self.name = name - self.remote = remote - self.version = version - self.subpackages = subpackages - - @property - def directory(self): - return os.path.join(user_config.USER_CONFIG.bsp_dir, - BSP_PACKAGE_DIR_FORMAT.format( - name=self.name, source_version=self.version)) - - def _get_subpackage(self, name): - """Gets a subpackage of the given name. - - Args: - name: The name of the subpackage to get. - - Returns: - The desired subpackage. - - Raises: - NoSuchSubpackageError: If no subpackage with the desired name exists. - """ - subpackage_ = self.subpackages.get(name) - if not subpackage_: - raise NoSuchSubpackageError(name) - return subpackage_ - - def is_downloaded(self): - return os.path.isdir(self.directory) - - def subpackage_status(self, name, path=''): - """Checks the status of a given subpackage. - - Args: - name: the name of the subpackage to check status of. ValueError if - self has no subpackage matching the given name. - path: (optional) The path where the subpackage is linked from. - If empty, do not check for UNRECOGNIZED/LINKED. - - Returns: - (status, status_string) pair, where status is, in order: - status.UNRECOGNIZED: if the given path exists. - status.NOT_INSTALLED: if the package isn't downloaded. - status.MISSING: if the subpackage subdir isn't a dir. - status.LINKED: if the subpackage subdir is present and - linked to from the given path. - status.INSTALLED: if the package is downloaded and the subdir present. + """Class for packages. + + Attributes: + name: the name of the package. + remote: the remote location of the package. + version: the version of the package. + subpackages: A dictionary of subpackages + ({ subpackage_name : Subpackage }). + directory: (read-only) the local directory for the package. """ - subpackage_ = self._get_subpackage(name) - subdir = os.path.join(self.directory, subpackage_.subdir) - subdir_is_dir = os.path.isdir(subdir) - if os.path.exists(path): # '' does not exist. - if (os.path.islink(path) and - subdir_is_dir and - os.path.samefile(path, subdir)): - result = status.LINKED - else: - result = status.UNRECOGNIZED - elif not self.is_downloaded(): - result = status.NOT_INSTALLED - elif not os.path.isdir(subdir): - result = status.MISSING - else: - # It is downloaded and exists. - result = status.INSTALLED - - return (result, '{} - {}'.format(name, status.status_string(result))) - - def download(self, tarball=None): - """Downloads the package files (or just extracts them from a tarball). - - Only downloads if the package is not already downloaded. - - Args: - tarball: (optional) if provided, replaces the download part of tar - download and skips straight to unzipping. - - Raises: - DownloadError: (or a child thereof) if the package fails to complete - all steps of the download process (downloading, verifying version, - etc.). - PathBlockedError: if there is an unknown file/dir at the - target download location. - ValueError: if a tarball is passed to a git package. - """ - if self.is_downloaded(): - return - - if os.path.exists(self.directory): - raise PathBlockedError('Intended download location {}.'.format( - self.directory)) - - success = False - try: - os.makedirs(self.directory) - self._type_specific_download(tarball) - success = True - finally: - if not success: - self.uninstall() - - def uninstall(self): - """Uninstalls a package, removing the download. - - Note that this may result in dead links in tree if the package was linked. - - Raises: - OSError: if there is an unexpected problem uninstalling. - """ - if self.is_downloaded(): - shutil.rmtree(self.directory) - - def _type_specific_download(self, tarball=None): - """Do package type specific downloading. - Args: - tarball: (optional) if provided, replaces the download part of tar - download and skips straight to unzipping. - - Raises: - ValueError: If a tarball is passed to a git package. - DownloadError: (or a subclass thereof) if there is some complication - downloading. - """ - raise NotImplementedError( - 'Download function should be overridden in child class.') - - def get_licenses(self, subpackage_names): - """Get all licenses for all subpackages. - - Args: - subpackage_names: a list of names of subpackages to get licenses of. - - Returns: - A list of License namedtuples. - - Raises: - NoSuchSubpackageError: If any of the subpackages are missing. - """ - result = [] - subpackages = [self._get_subpackage(name) - for name - in subpackage_names] - for s in subpackages: - result += [License('{}.{} - {}'.format(self.name, s.name, path), - os.path.join(self.directory, s.subdir, path)) - for path - in s.licenses] - - return result - - def link_subpackage(self, name, path): - """Links a subpackage into the BDK. - - Args: - name: the name of the subpackage to link. ValueError if - self has no subpackage matching the given name. - path: the path where the subpackage should be linked from. - - Raises: - SubpackageLinkError: if the subpackage fails to link. - MissingDirectoryError: if the subpackage's subdir is missing. - NoSuchSubpackageError: if name isn't a valid subpackage name. - NotDownloadedError: if the package isn't downloaded. - """ - if not self.is_downloaded(): - raise NotDownloadedError() - - subpackage_ = self._get_subpackage(name) - subdir = os.path.join(self.directory, subpackage_.subdir) - if not os.path.isdir(subdir): - raise MissingDirectoryError('{} for subpackage {}'.format( - subpackage_.subdir, subpackage_.name)) - - try: - parent_dir = os.path.dirname(path) - if parent_dir and not os.path.isdir(parent_dir): - os.makedirs(parent_dir) - os.symlink(subdir, path) - except OSError as e: - raise SubpackageLinkError( - '{} subdir {}: {}'.format( - subpackage_.name, subpackage_.subdir, e)) - - def unlink_subpackage(self, name, path): - """Unlinks a given subpackage. - - The subpackage must be linked to start with. - - Args: - name: the name of the subpackage to unlink. - path: the path where the subpackage is linked from. - - Raises: - NoSuchSubpackageError: if self has no subpackage matching the given name. - SubpackageUnlinkError: if there is trouble unlinking. - """ - (status_, _) = self.subpackage_status(name, path) - if status_ == status.LINKED: - try: - os.remove(path) - except OSError as e: - raise SubpackageUnlinkError( - '{} (linked from {}): {}.'.format( - name, path, e)) - else: - raise SubpackageUnlinkError('{} is not linked from {}.'.format( - name, path)) - - @classmethod - def from_dict(cls, dic, name): - """Create a Package from a dict. - - Args: - dic: the dictionary to build the package from. - name: the name of the package. - - Returns: - A Package with the given name generated from |dic|. - - Raises: - KeyError: If a required key is missing. - ValueError: If an invalid value is in dic. - """ - package_type = dic[PACKAGE_KEY_TYPE] - package_subclass = None - if package_type == GIT_PACKAGE_TYPE: - package_subclass = GitPackage - elif package_type == TAR_PACKAGE_TYPE: - package_subclass = TarPackage - else: - raise ValueError('Package type must be either {} or {}.'.format( - GIT_PACKAGE_TYPE, TAR_PACKAGE_TYPE)) - - subpackages = {} - for (subpackage_name, data) in dic[PACKAGE_KEY_SUBPACKAGES].iteritems(): - try: - subpackages[subpackage_name] = subpackage.Subpackage.from_dict( - data, subpackage_name) - except KeyError as e: - raise KeyError('subpackage {}: {}'.format(name, e)) - - return package_subclass(name, - dic[PACKAGE_KEY_REMOTE], - dic[PACKAGE_KEY_VERSION], - subpackages) + def __init__(self, name, remote, version, subpackages): + self.name = name + self.remote = remote + self.version = version + self.subpackages = subpackages + + @property + def directory(self): + return os.path.join(user_config.USER_CONFIG.bsp_dir, + BSP_PACKAGE_DIR_FORMAT.format( + name=self.name, source_version=self.version)) + + def _get_subpackage(self, name): + """Gets a subpackage of the given name. + + Args: + name: The name of the subpackage to get. + + Returns: + The desired subpackage. + + Raises: + NoSuchSubpackageError: If no subpackage with the desired name + exists. + """ + subpackage_ = self.subpackages.get(name) + if not subpackage_: + raise NoSuchSubpackageError(name) + return subpackage_ + + def is_downloaded(self): + return os.path.isdir(self.directory) + + def subpackage_status(self, name, path=''): + """Checks the status of a given subpackage. + + Args: + name: the name of the subpackage to check status of. ValueError if + self has no subpackage matching the given name. + path: (optional) The path where the subpackage is linked from. + If empty, do not check for UNRECOGNIZED/LINKED. + + Returns: + (status, status_string) pair, where status is, in order: + status.UNRECOGNIZED: if the given path exists. + status.NOT_INSTALLED: if the package isn't downloaded. + status.MISSING: if the subpackage subdir isn't a dir. + status.LINKED: if the subpackage subdir is present and + linked to from the given path. + status.INSTALLED: if the package is downloaded and the subdir + present. + """ + subpackage_ = self._get_subpackage(name) + subdir = os.path.join(self.directory, subpackage_.subdir) + subdir_is_dir = os.path.isdir(subdir) + if os.path.exists(path): # '' does not exist. + if (os.path.islink(path) and + subdir_is_dir and + os.path.samefile(path, subdir)): + result = status.LINKED + else: + result = status.UNRECOGNIZED + elif not self.is_downloaded(): + result = status.NOT_INSTALLED + elif not os.path.isdir(subdir): + result = status.MISSING + else: + # It is downloaded and exists. + result = status.INSTALLED + + return (result, '{} - {}'.format(name, status.status_string(result))) + + def download(self, tarball=None): + """Downloads the package files (or just extracts them from a tarball). + + Only downloads if the package is not already downloaded. + + Args: + tarball: (optional) if provided, replaces the download part of tar + download and skips straight to unzipping. + + Raises: + DownloadError: (or a child thereof) if the package fails to complete + all steps of the download process (downloading, verifying + version, etc.). + PathBlockedError: if there is an unknown file/dir at the + target download location. + ValueError: if a tarball is passed to a git package. + """ + if self.is_downloaded(): + return + + if os.path.exists(self.directory): + raise PathBlockedError('Intended download location {}.'.format( + self.directory)) + + success = False + try: + os.makedirs(self.directory) + self._type_specific_download(tarball) + success = True + finally: + if not success: + self.uninstall() + + def uninstall(self): + """Uninstalls a package, removing the download. + + Note that this may result in dead links in tree if the package was + linked. + + Raises: + OSError: if there is an unexpected problem uninstalling. + """ + if self.is_downloaded(): + shutil.rmtree(self.directory) + + def _type_specific_download(self, tarball=None): + """Do package type specific downloading. + + Args: + tarball: (optional) if provided, replaces the download part of tar + download and skips straight to unzipping. + + Raises: + ValueError: If a tarball is passed to a git package. + DownloadError: (or a subclass thereof) if there is some complication + downloading. + """ + raise NotImplementedError( + 'Download function should be overridden in child class.') + + def get_licenses(self, subpackage_names): + """Get all licenses for all subpackages. + + Args: + subpackage_names: a list of names of subpackages to get licenses of. + + Returns: + A list of License namedtuples. + + Raises: + NoSuchSubpackageError: If any of the subpackages are missing. + """ + result = [] + subpackages = [self._get_subpackage(name) + for name + in subpackage_names] + for s in subpackages: + result += [License('{}.{} - {}'.format(self.name, s.name, path), + os.path.join(self.directory, s.subdir, path)) + for path + in s.licenses] + + return result + + def link_subpackage(self, name, path): + """Links a subpackage into the BDK. + + Args: + name: the name of the subpackage to link. ValueError if + self has no subpackage matching the given name. + path: the path where the subpackage should be linked from. + + Raises: + SubpackageLinkError: if the subpackage fails to link. + MissingDirectoryError: if the subpackage's subdir is missing. + NoSuchSubpackageError: if name isn't a valid subpackage name. + NotDownloadedError: if the package isn't downloaded. + """ + if not self.is_downloaded(): + raise NotDownloadedError() + + subpackage_ = self._get_subpackage(name) + subdir = os.path.join(self.directory, subpackage_.subdir) + if not os.path.isdir(subdir): + raise MissingDirectoryError('{} for subpackage {}'.format( + subpackage_.subdir, subpackage_.name)) + + try: + parent_dir = os.path.dirname(path) + if parent_dir and not os.path.isdir(parent_dir): + os.makedirs(parent_dir) + os.symlink(subdir, path) + except OSError as e: + raise SubpackageLinkError( + '{} subdir {}: {}'.format( + subpackage_.name, subpackage_.subdir, e)) + + def unlink_subpackage(self, name, path): + """Unlinks a given subpackage. + + The subpackage must be linked to start with. + + Args: + name: the name of the subpackage to unlink. + path: the path where the subpackage is linked from. + + Raises: + NoSuchSubpackageError: if self has no subpackage matching the given + name. + SubpackageUnlinkError: if there is trouble unlinking. + """ + (status_, _) = self.subpackage_status(name, path) + if status_ == status.LINKED: + try: + os.remove(path) + except OSError as e: + raise SubpackageUnlinkError( + '{} (linked from {}): {}.'.format( + name, path, e)) + else: + raise SubpackageUnlinkError('{} is not linked from {}.'.format( + name, path)) + + @classmethod + def from_dict(cls, dic, name): + """Create a Package from a dict. + + Args: + dic: the dictionary to build the package from. + name: the name of the package. + + Returns: + A Package with the given name generated from |dic|. + + Raises: + KeyError: If a required key is missing. + ValueError: If an invalid value is in dic. + """ + package_type = dic[PACKAGE_KEY_TYPE] + package_subclass = None + if package_type == GIT_PACKAGE_TYPE: + package_subclass = GitPackage + elif package_type == TAR_PACKAGE_TYPE: + package_subclass = TarPackage + else: + raise ValueError('Package type must be either {} or {}.'.format( + GIT_PACKAGE_TYPE, TAR_PACKAGE_TYPE)) + + subpackages = {} + for (subpackage_name, data) in dic[PACKAGE_KEY_SUBPACKAGES].iteritems(): + try: + subpackages[subpackage_name] = subpackage.Subpackage.from_dict( + data, subpackage_name) + except KeyError as e: + raise KeyError('subpackage {}: {}'.format(name, e)) + + return package_subclass(name, + dic[PACKAGE_KEY_REMOTE], + dic[PACKAGE_KEY_VERSION], + subpackages) class GitPackage(Package): - def _type_specific_download(self, tarball=None): - """Download a Git repo. See Package.Download.""" - if tarball: - raise ValueError('Initializing Git packages from a tarball ' - 'is not currently supported.') - # Setup. Break down version into branch + hash - # Use rsplit for the off chance ':' is part of the branch name. - (branch, branch_hash) = self.version.rsplit(':', 1) - git = tool.PathToolWrapper('git') - - # Check if branch points to hash - # If so, we can do a shallow clone - try: - out, _ = git.run(['ls-remote', self.remote, branch], piped=True) - except tool.Error as e: - raise RemoteError(e) - if out.startswith(branch_hash): - shallow = True - depth = '--depth=1' - else: - shallow = False - depth = '--single-branch' - - # Clone the repo. - try: - git.run(['clone', depth, '--branch=' + branch, - self.remote, self.directory]) - except tool.Error as e: - raise DownloadError('from {}: {}'.format(self.remote, e)) - - # Checkout the right revision if necessary - if not shallow: - try: - git.set_cwd(self.directory) - git.run(['reset', '--hard', branch_hash]) - git.set_cwd(None) - except tool.Error as e: - raise VersionError(e) - - # Check that the correct revision was downloaded. - git_dir = os.path.join(self.directory, '.git') - try: - out, _ = git.run(['--git-dir=' + git_dir, 'rev-parse', branch], - piped=True) - except tool.Error as e: - raise VersionError(e) - - if not out.startswith(branch_hash): - raise VersionError('Remote package from {} ' - 'is the incorrect version ' - '{} (expected {}).'.format( - self.remote, out, branch_hash)) - - # Everything went well. Clean up the download; we don't need git history. - shutil.rmtree(git_dir) + def _type_specific_download(self, tarball=None): + """Download a Git repo. See Package.Download.""" + if tarball: + raise ValueError('Initializing Git packages from a tarball ' + 'is not currently supported.') + # Setup. Break down version into branch + hash + # Use rsplit for the off chance ':' is part of the branch name. + (branch, branch_hash) = self.version.rsplit(':', 1) + git = tool.PathToolWrapper('git') + + # Check if branch points to hash + # If so, we can do a shallow clone + try: + out, _ = git.run(['ls-remote', self.remote, branch], piped=True) + except tool.Error as e: + raise RemoteError(e) + if out.startswith(branch_hash): + shallow = True + depth = '--depth=1' + else: + shallow = False + depth = '--single-branch' + + # Clone the repo. + try: + git.run(['clone', depth, '--branch=' + branch, + self.remote, self.directory]) + except tool.Error as e: + raise DownloadError('from {}: {}'.format(self.remote, e)) + + # Checkout the right revision if necessary + if not shallow: + try: + git.set_cwd(self.directory) + git.run(['reset', '--hard', branch_hash]) + git.set_cwd(None) + except tool.Error as e: + raise VersionError(e) + + # Check that the correct revision was downloaded. + git_dir = os.path.join(self.directory, '.git') + try: + out, _ = git.run(['--git-dir=' + git_dir, 'rev-parse', branch], + piped=True) + except tool.Error as e: + raise VersionError(e) + + if not out.startswith(branch_hash): + raise VersionError('Remote package from {} ' + 'is the incorrect version ' + '{} (expected {}).'.format( + self.remote, out, branch_hash)) + + # Everything went well. Clean up the download; we don't need git + # history. + shutil.rmtree(git_dir) class TarPackage(Package): - @classmethod - def get_tarball_version(cls, tarball_file): - """Gets the tarball version (sha256) of a tarball file.""" - try: - with open(tarball_file) as tar: - tar_hash = hashlib.sha256(tar.read()).hexdigest() - except IOError as e: - raise VersionError(e) - return tar_hash - - def _type_specific_download(self, tarball_file=None): - """Download a tarball package. See Package.Download.""" - tarball_provided = (tarball_file is not None) - # Store the source for error messages. - tar_source = tarball_file if tarball_provided else self.remote - - tool_runner = tool.PathToolRunner() - - # Get the tarball if it wasn't provided - if not tarball_provided: - # Setup. - tarball_file = os.path.join(self.directory, DEFAULT_TAR_FILE) - - # Download the tarball. - try: - tool_runner.run('curl', ['-o', tarball_file, self.remote]) - except tool.Error as e: - raise DownloadError(e) - - try: - # Check version of the tarball by using a checksum - tar_hash = self.get_tarball_version(tarball_file) - if not tar_hash.startswith(self.version): - raise VersionError('Tarball from {} is the incorrect version {} ' - '(expected {})'.format( - tar_source, tar_hash, self.version)) - - # Unzip the tarball - try: - tool_runner.run('tar', ['-C', self.directory, '-xzf', tarball_file]) - except tool.Error as e: - raise UnzipError(e) - - finally: - # Whether extraction succeeds or fails, - # we should remove the downloaded tarball. - if not tarball_provided: - os.remove(tarball_file) + @classmethod + def get_tarball_version(cls, tarball_file): + """Gets the tarball version (sha256) of a tarball file.""" + try: + with open(tarball_file) as tar: + tar_hash = hashlib.sha256(tar.read()).hexdigest() + except IOError as e: + raise VersionError(e) + return tar_hash + + def _type_specific_download(self, tarball_file=None): + """Download a tarball package. See Package.Download.""" + tarball_provided = (tarball_file is not None) + # Store the source for error messages. + tar_source = tarball_file if tarball_provided else self.remote + + tool_runner = tool.PathToolRunner() + + # Get the tarball if it wasn't provided + if not tarball_provided: + # Setup. + tarball_file = os.path.join(self.directory, DEFAULT_TAR_FILE) + + # Download the tarball. + try: + tool_runner.run('curl', ['-o', tarball_file, self.remote]) + except tool.Error as e: + raise DownloadError(e) + + try: + # Check version of the tarball by using a checksum + tar_hash = self.get_tarball_version(tarball_file) + if not tar_hash.startswith(self.version): + raise VersionError('Tarball from {} is the incorrect version ' + '{} (expected {})'.format( + tar_source, tar_hash, self.version)) + + # Unzip the tarball + try: + tool_runner.run('tar', ['-C', self.directory, '-xzf', + tarball_file]) + except tool.Error as e: + raise UnzipError(e) + + finally: + # Whether extraction succeeds or fails, + # we should remove the downloaded tarball. + if not tarball_provided: + os.remove(tarball_file) diff --git a/cli/lib/bsp/package_stub.py b/cli/lib/bsp/package_stub.py index a89c2cd..e1b980a 100644 --- a/cli/lib/bsp/package_stub.py +++ b/cli/lib/bsp/package_stub.py @@ -16,108 +16,110 @@ """A stub for the Package class""" + from bsp import package from bsp import status + class StubPackage(object): - """Replacement for Packages""" - from_dict_error = None - - def __init__(self, name='', remote='', version='', subpackages=None, - downloaded=True, licenses=None): - self.name = name - self.remote = remote - self.version = version - # Map of statuses, doubles as iterable of subpackages. - self.subpackages = subpackages or {} - self.should_remove = [] - # Map of {subpackages: post-link status} - self.should_link = {} - self.should_unlink = {} - self.should_download = False - self.should_uninstall = False - self.downloaded = downloaded - self.should_refresh = False - self.licenses = licenses or [] - - def is_downloaded(self): - return self.downloaded - - def get_licenses(self, _subpackages): - result = [] - for l in self.licenses: - result.append(package.License(l, l)) - return result - - def subpackage_status(self, name, path=''): - # Will error if no such subpackage - stat = self.subpackages[name] - if not self.downloaded: - stat = status.NOT_INSTALLED - if not path and stat <= status.UNRECOGNIZED: - # Technically UNRECOGNIZED could still be MISSING too, but - # this is a close enough approximation for current tests. - stat = status.INSTALLED - return (stat, '{} - {}'.format(name, status.status_string(stat))) - - # pylint: disable=unused-argument - def download(self, tarball): - if not self.downloaded and not self.should_download: - raise package.DownloadError('Not supposed to download {}'.format( - self.name)) - self.downloaded = True - for s in self.subpackages: - self.subpackages[s] = status.INSTALLED - - def uninstall(self): - if not self.should_uninstall: - raise OSError('Not supposed to uninstall {}'.format(self.name)) - self.downloaded = False - for s in self.subpackages: - self.subpackages[s] = status.NOT_INSTALLED - - def link_subpackage(self, name, path): - if not self.downloaded: - raise package.NotDownloadedError - if not name in self.subpackages: - raise package.NoSuchSubpackageError - if not self.should_link.get(name) == path: - raise package.SubpackageLinkError( - 'Only supposed to link {} to {} (requested {})'.format( - name, self.should_link.get(name), path)) - self.subpackages[name] = status.LINKED - - def unlink_subpackage(self, name, path): - if not name in self.subpackages: - raise package.NoSuchSubpackageError - if not self.subpackages.get(name) == status.LINKED: - raise package.SubpackageUnlinkError('{} is not linked'.format(name)) - if not self.should_unlink.get(name) == path: - raise package.SubpackageUnlinkError( - 'Only supposed to unlink {} from {} (requested {})'.format( - name, self.should_unlink.get(name), path)) - self.subpackages[name] = status.INSTALLED - - @classmethod - def from_dict(cls, _dic, name): - if cls.from_dict_error: - raise ValueError('Error.') - else: - return cls(name) + """Replacement for Packages""" + from_dict_error = None + + def __init__(self, name='', remote='', version='', subpackages=None, + downloaded=True, licenses=None): + self.name = name + self.remote = remote + self.version = version + # Map of statuses, doubles as iterable of subpackages. + self.subpackages = subpackages or {} + self.should_remove = [] + # Map of {subpackages: post-link status} + self.should_link = {} + self.should_unlink = {} + self.should_download = False + self.should_uninstall = False + self.downloaded = downloaded + self.should_refresh = False + self.licenses = licenses or [] + + def is_downloaded(self): + return self.downloaded + + def get_licenses(self, _subpackages): + result = [] + for l in self.licenses: + result.append(package.License(l, l)) + return result + + def subpackage_status(self, name, path=''): + # Will error if no such subpackage + stat = self.subpackages[name] + if not self.downloaded: + stat = status.NOT_INSTALLED + if not path and stat <= status.UNRECOGNIZED: + # Technically UNRECOGNIZED could still be MISSING too, but + # this is a close enough approximation for current tests. + stat = status.INSTALLED + return (stat, '{} - {}'.format(name, status.status_string(stat))) + + # pylint: disable=unused-argument + def download(self, tarball): + if not self.downloaded and not self.should_download: + raise package.DownloadError('Not supposed to download {}'.format( + self.name)) + self.downloaded = True + for s in self.subpackages: + self.subpackages[s] = status.INSTALLED + + def uninstall(self): + if not self.should_uninstall: + raise OSError('Not supposed to uninstall {}'.format(self.name)) + self.downloaded = False + for s in self.subpackages: + self.subpackages[s] = status.NOT_INSTALLED + + def link_subpackage(self, name, path): + if not self.downloaded: + raise package.NotDownloadedError + if not name in self.subpackages: + raise package.NoSuchSubpackageError + if not self.should_link.get(name) == path: + raise package.SubpackageLinkError( + 'Only supposed to link {} to {} (requested {})'.format( + name, self.should_link.get(name), path)) + self.subpackages[name] = status.LINKED + + def unlink_subpackage(self, name, path): + if not name in self.subpackages: + raise package.NoSuchSubpackageError + if not self.subpackages.get(name) == status.LINKED: + raise package.SubpackageUnlinkError('{} is not linked'.format(name)) + if not self.should_unlink.get(name) == path: + raise package.SubpackageUnlinkError( + 'Only supposed to unlink {} from {} (requested {})'.format( + name, self.should_unlink.get(name), path)) + self.subpackages[name] = status.INSTALLED + + @classmethod + def from_dict(cls, _dic, name): + if cls.from_dict_error: + raise ValueError('Error.') + else: + return cls(name) class StubTarPackage(object): - """Replacement for the TarPackage class methods.""" + """Replacement for the TarPackage class methods.""" - def __init__(self, tarball_version=''): - self.tarball_version = tarball_version + def __init__(self, tarball_version=''): + self.tarball_version = tarball_version - def get_tarball_version(self, _tarball): - return self.tarball_version + def get_tarball_version(self, _tarball): + return self.tarball_version class StubPackageModule(object): - """Stubs for the package module as needed.""" + """Stubs for the package module as needed.""" - def __init__(self, tar_package=None): - self.Error = package.Error - self.NotDownloadedError = package.NotDownloadedError - self.TarPackage = tar_package or StubTarPackage() + def __init__(self, tar_package=None): + self.Error = package.Error + self.NotDownloadedError = package.NotDownloadedError + self.TarPackage = tar_package or StubTarPackage() diff --git a/cli/lib/bsp/package_unittest.py b/cli/lib/bsp/package_unittest.py index 939dadb..8a96b33 100644 --- a/cli/lib/bsp/package_unittest.py +++ b/cli/lib/bsp/package_unittest.py @@ -25,569 +25,570 @@ from bsp import subpackage from core import user_config_stub from core import util_stub from test import stubs -from test import test_util class PackageTest(unittest.TestCase): - def setUp(self): - self.stub_os = stubs.StubOs() - self.stub_open = stubs.StubOpen(self.stub_os) - self.stub_hashlib = stubs.StubHashlib() - self.stub_shutil = stubs.StubShutil(self.stub_os) - self.stub_subprocess = stubs.StubSubprocess(stub_os=self.stub_os, - stub_open=self.stub_open) - self.stub_user_config = user_config_stub.StubUserConfig() - self.stub_util = util_stub.StubUtil() - - package.os = self.stub_os - package.open = self.stub_open.open - package.shutil = self.stub_shutil - package.hashlib = self.stub_hashlib - package.tool.subprocess = self.stub_subprocess - package.user_config = self.stub_user_config - package.util = self.stub_util - - self.subpackages = { - 'subpackage1': subpackage.Subpackage('subpackage1', 'subdir1', - ['license1', 'license2']), - 'subpackage2': subpackage.Subpackage('subpackage2', 'sub/dir2', - ['license2', 'license3']) - } - self.pkg = package.Package('name', 'remote', 'version', self.subpackages) - - # Special packages for downloading - self.tar_package = package.TarPackage('name', 'remote', 'hash', - self.subpackages) - self.git_package = package.GitPackage('name', 'remote', 'branch:hash', - self.subpackages) - - # Helpers for subpackage status. - def setup_missing_subpackage(self): - # Package dir is there, but subpackage is not. - self.stub_os.path.should_be_dir.append(self.pkg.directory) - - def setup_installed_subpackage(self, name): - # Like missing, but subpackage is present. - self.setup_missing_subpackage() - subdir_path = self.stub_os.path.join(self.pkg.directory, - self.pkg.subpackages[name].subdir) - self.stub_os.path.should_be_dir.append(subdir_path) - self.stub_os.path.should_exist.append(subdir_path) - - def setup_unknown_subpackage(self, name, path): - # Like installed, but with a file at path. - self.setup_installed_subpackage(name) - self.stub_os.path.should_exist.append(path) - - def setup_linked_subpackage(self, name, path): - # Like unknown, but with a link to the right place. - self.setup_unknown_subpackage(name, path) - self.stub_os.path.should_link[path] = self.stub_os.path.join( - self.pkg.directory, self.pkg.subpackages[name].subdir) - - def setup_broken_link_subpackage(self, name, path): - # Like missing, but with a link to the missing dir. - self.setup_missing_subpackage() - self.stub_os.path.should_exist.append(path) - self.stub_os.path.should_link[path] = self.stub_os.path.join( - self.pkg.directory, self.pkg.subpackages[name].subdir) - - def test_from_dict(self): - package_dict = { - 'package_type': 'git', - 'remote': 'github', - 'version': 'branch:deadbeef', - 'subpackages': { - 'subpackage_1': { - 'subdir': 'sub1', - 'licenses': ['license.txt'] - }, - 'subpackage_2': { - 'subdir': 'sub2', - 'licenses': [] + def setUp(self): + self.stub_os = stubs.StubOs() + self.stub_open = stubs.StubOpen(self.stub_os) + self.stub_hashlib = stubs.StubHashlib() + self.stub_shutil = stubs.StubShutil(self.stub_os) + self.stub_subprocess = stubs.StubSubprocess(stub_os=self.stub_os, + stub_open=self.stub_open) + self.stub_user_config = user_config_stub.StubUserConfig() + self.stub_util = util_stub.StubUtil() + + package.os = self.stub_os + package.open = self.stub_open.open + package.shutil = self.stub_shutil + package.hashlib = self.stub_hashlib + package.tool.subprocess = self.stub_subprocess + package.user_config = self.stub_user_config + package.util = self.stub_util + + self.subpackages = { + 'subpackage1': subpackage.Subpackage('subpackage1', 'subdir1', + ['license1', 'license2']), + 'subpackage2': subpackage.Subpackage('subpackage2', 'sub/dir2', + ['license2', 'license3']) + } + self.pkg = package.Package('name', 'remote', 'version', + self.subpackages) + + # Special packages for downloading + self.tar_package = package.TarPackage('name', 'remote', 'hash', + self.subpackages) + self.git_package = package.GitPackage('name', 'remote', 'branch:hash', + self.subpackages) + + # Helpers for subpackage status. + def setup_missing_subpackage(self): + # Package dir is there, but subpackage is not. + self.stub_os.path.should_be_dir.append(self.pkg.directory) + + def setup_installed_subpackage(self, name): + # Like missing, but subpackage is present. + self.setup_missing_subpackage() + subdir_path = self.stub_os.path.join(self.pkg.directory, + self.pkg.subpackages[name].subdir) + self.stub_os.path.should_be_dir.append(subdir_path) + self.stub_os.path.should_exist.append(subdir_path) + + def setup_unknown_subpackage(self, name, path): + # Like installed, but with a file at path. + self.setup_installed_subpackage(name) + self.stub_os.path.should_exist.append(path) + + def setup_linked_subpackage(self, name, path): + # Like unknown, but with a link to the right place. + self.setup_unknown_subpackage(name, path) + self.stub_os.path.should_link[path] = self.stub_os.path.join( + self.pkg.directory, self.pkg.subpackages[name].subdir) + + def setup_broken_link_subpackage(self, name, path): + # Like missing, but with a link to the missing dir. + self.setup_missing_subpackage() + self.stub_os.path.should_exist.append(path) + self.stub_os.path.should_link[path] = self.stub_os.path.join( + self.pkg.directory, self.pkg.subpackages[name].subdir) + + def test_from_dict(self): + package_dict = { + 'package_type': 'git', + 'remote': 'github', + 'version': 'branch:deadbeef', + 'subpackages': { + 'subpackage_1': { + 'subdir': 'sub1', + 'licenses': ['license.txt'] + }, + 'subpackage_2': { + 'subdir': 'sub2', + 'licenses': [] + } } } - } - pkg = package.Package.from_dict(package_dict, 'package_1') - self.assertIsInstance(pkg, package.Package) - self.assertEqual(len(pkg.subpackages), 2) - self.assertTrue('subpackage_1' in pkg.subpackages) - self.assertEqual(pkg.remote, 'github') - self.assertEqual(pkg.version, 'branch:deadbeef') - - # Check git vs. tar packages. - self.assertIsInstance(pkg, package.GitPackage) - package_dict['package_type'] = 'tar' - pkg = package.Package.from_dict(package_dict, 'package_1') - self.assertIsInstance(pkg, package.TarPackage) - - def test_from_dict_invalid(self): - package_dict = { - 'package_type': 'not_git_or_tar', - 'remote': 'github', - 'version': 'branch:deadbeef', - 'subpackages': { - 'subpackage_1': { - 'subdir': 'sub1', - 'licenses': ['license.txt'] - }, - 'subpackage_2': { - 'subdir': 'sub2', - 'licenses': [] + pkg = package.Package.from_dict(package_dict, 'package_1') + self.assertIsInstance(pkg, package.Package) + self.assertEqual(len(pkg.subpackages), 2) + self.assertTrue('subpackage_1' in pkg.subpackages) + self.assertEqual(pkg.remote, 'github') + self.assertEqual(pkg.version, 'branch:deadbeef') + + # Check git vs. tar packages. + self.assertIsInstance(pkg, package.GitPackage) + package_dict['package_type'] = 'tar' + pkg = package.Package.from_dict(package_dict, 'package_1') + self.assertIsInstance(pkg, package.TarPackage) + + def test_from_dict_invalid(self): + package_dict = { + 'package_type': 'not_git_or_tar', + 'remote': 'github', + 'version': 'branch:deadbeef', + 'subpackages': { + 'subpackage_1': { + 'subdir': 'sub1', + 'licenses': ['license.txt'] + }, + 'subpackage_2': { + 'subdir': 'sub2', + 'licenses': [] + } } } - } - # Type must be git or tar. - with self.assertRaises(ValueError): - package.Package.from_dict(package_dict, 'package_1') - - def test_path(self): - self.assertEqual( - self.pkg.directory, - self.stub_os.path.join(self.stub_user_config.USER_CONFIG.bsp_dir, - self.pkg.name, - 'source_versions', - self.pkg.version)) - # If we update the user config, bsp dir should update accordingly; - # it shouldn't be cached. - self.stub_user_config.USER_CONFIG.bsp_dir = 'who/knows/where' - self.assertEqual( - self.pkg.directory, - self.stub_os.path.join(self.stub_user_config.USER_CONFIG.bsp_dir, - self.pkg.name, - 'source_versions', - self.pkg.version)) - - - def test_downloaded(self): - self.stub_os.path.should_be_dir = [self.pkg.directory] - self.assertTrue(self.pkg.is_downloaded()) - - def test_not_downloaded(self): - self.assertFalse(self.pkg.is_downloaded()) - - def test_subpackage_status_nonexistent_subpackage(self): - # Should get some sort of error about 'not_a_subpackage' - # not being a real subpackage of pkg. - with self.assertRaisesRegexp(package.NoSuchSubpackageError, - 'not_a_subpackage'): - self.pkg.subpackage_status('not_a_subpackage', 'path') - - def test_subpackage_status_not_installed(self): - self.assertEqual(self.pkg.subpackage_status('subpackage1', 'path')[0], - status.NOT_INSTALLED) - # Not installed with no path: still not installed. - self.assertEqual(self.pkg.subpackage_status('subpackage1')[0], - status.NOT_INSTALLED) - - def test_subpackage_status_installed(self): - self.setup_installed_subpackage('subpackage1') - self.assertEqual(self.pkg.subpackage_status('subpackage1', 'path')[0], - status.INSTALLED) - # Installed with no path: still installed. - self.assertEqual(self.pkg.subpackage_status('subpackage1')[0], - status.INSTALLED) - - - def test_subpackage_status_linked(self): - self.setup_linked_subpackage('subpackage1', 'link') - self.assertEqual(self.pkg.subpackage_status('subpackage1', 'link')[0], - status.LINKED) - # Linked with no path: just installed. - self.assertEqual(self.pkg.subpackage_status('subpackage1')[0], - status.INSTALLED) - - - def test_subpackage_status_missing(self): - self.setup_missing_subpackage() - self.assertEqual(self.pkg.subpackage_status('subpackage1', 'path')[0], - status.MISSING) - # Missing with no path: still missing. - self.assertEqual(self.pkg.subpackage_status('subpackage1')[0], - status.MISSING) - - def test_subpackage_status_broken_link(self): - self.setup_broken_link_subpackage('subpackage1', 'link') - self.assertEqual(self.pkg.subpackage_status('subpackage1', 'link')[0], - status.UNRECOGNIZED) - # Broken link because missing, with no path: just missing. - self.assertEqual(self.pkg.subpackage_status('subpackage1')[0], - status.MISSING) - - def test_subpackage_status_unknown(self): - self.setup_unknown_subpackage('subpackage1', '???') - self.assertEqual(self.pkg.subpackage_status('subpackage1', '???')[0], - status.UNRECOGNIZED) - # Unknown but installed, with no path: just installed. - self.assertEqual(self.pkg.subpackage_status('subpackage1')[0], - status.INSTALLED) - - def test_subpackage_status_incorrect_version(self): - self.setup_installed_subpackage('subpackage1') - self.pkg.version = 'different_version' - # Different versions are treated as completely different BSPs for now. - self.assertEqual(self.pkg.subpackage_status('subpackage1', 'path')[0], - status.NOT_INSTALLED) - # Still not installed with no path. - self.assertEqual(self.pkg.subpackage_status('subpackage1')[0], - status.NOT_INSTALLED) - - def test_tarball_version(self): - file1 = stubs.StubFile('file1') - self.stub_os.path.should_exist = ['file1'] - self.stub_open.files = {'file1': file1} - self.stub_hashlib.should_return = '<hash_of_file1>' - self.assertEqual(package.TarPackage.get_tarball_version('file1'), - '<hash_of_file1>') - - def test_tarball_invalid_version(self): - self.stub_os.path.should_exist = [] - with self.assertRaises(package.VersionError): - package.TarPackage.get_tarball_version('file1') - - def test_already_downloaded(self): - self.stub_os.path.should_be_dir = [self.pkg.directory] - # Shouldn't do any downloading - no dirs should be made. - self.stub_os.should_makedirs = [] - self.pkg.download() - - def test_download_exception(self): - # pkg is the abstract Package class, - # download isn't fully implemented - self.stub_os.should_makedirs = [self.pkg.directory] - with self.assertRaises(NotImplementedError): - self.pkg.download() - # downloaded failed, should clean up the download directory. - self.assertFalse(self.stub_os.path.exists(self.pkg.directory)) - - def test_git_download_shallow(self): - self.stub_os.should_makedirs = [self.git_package.directory] - # Note: we don't test that tmp_dir/.git exists, because that's - # hard to write cleanly; so we assume it is created and exists - # as expected by clone/rev-parse. - ls_remote = self.stub_subprocess.AddCommand(ret_out='hash') - clone = self.stub_subprocess.AddFileCommand(pos_out=[-1]) - rev_parse = self.stub_subprocess.AddCommand(ret_out='hash') - - self.git_package.download() - self.assertTrue(self.stub_os.path.exists(self.git_package.directory)) - ls_remote.AssertCallContained(['git', 'ls-remote']) - clone.AssertCallContained(['git', 'clone']) - rev_parse.AssertCallContained(['git', 'rev-parse']) - - def test_git_download_full(self): - self.stub_os.should_makedirs = [self.git_package.directory] - # Note: we don't test that tmp_dir/.git exists, because that's - # hard to write cleanly; so we assume it is created and exists - # as expected by clone/rev-parse. - ls_remote = self.stub_subprocess.AddCommand(ret_out='wronghash') - clone = self.stub_subprocess.AddFileCommand(pos_out=[-1]) - reset = self.stub_subprocess.AddCommand() - rev_parse = self.stub_subprocess.AddCommand(ret_out='hash') - - self.git_package.download() - self.assertTrue(self.stub_os.path.exists(self.git_package.directory)) - ls_remote.AssertCallContained(['git', 'ls-remote']) - clone.AssertCallContained(['git', 'clone']) - reset.AssertCallContained(['git', 'reset']) - rev_parse.AssertCallContained(['git', 'rev-parse']) - - def test_git_failed_lsremote(self): - self.stub_os.should_makedirs = [self.git_package.directory] - ls_remote = self.stub_subprocess.AddCommand(ret_code=-1) - - with self.assertRaises(package.RemoteError): - self.git_package.download() - # Leave in a clean state - self.assertFalse(self.stub_os.path.exists(self.git_package.directory)) - ls_remote.AssertCallContained(['git', 'ls-remote']) + # Type must be git or tar. + with self.assertRaises(ValueError): + package.Package.from_dict(package_dict, 'package_1') + + def test_path(self): + self.assertEqual( + self.pkg.directory, + self.stub_os.path.join(self.stub_user_config.USER_CONFIG.bsp_dir, + self.pkg.name, + 'source_versions', + self.pkg.version)) + # If we update the user config, bsp dir should update accordingly; + # it shouldn't be cached. + self.stub_user_config.USER_CONFIG.bsp_dir = 'who/knows/where' + self.assertEqual( + self.pkg.directory, + self.stub_os.path.join(self.stub_user_config.USER_CONFIG.bsp_dir, + self.pkg.name, + 'source_versions', + self.pkg.version)) + + + def test_downloaded(self): + self.stub_os.path.should_be_dir = [self.pkg.directory] + self.assertTrue(self.pkg.is_downloaded()) + + def test_not_downloaded(self): + self.assertFalse(self.pkg.is_downloaded()) + + def test_subpackage_status_nonexistent_subpackage(self): + # Should get some sort of error about 'not_a_subpackage' + # not being a real subpackage of pkg. + with self.assertRaisesRegexp(package.NoSuchSubpackageError, + 'not_a_subpackage'): + self.pkg.subpackage_status('not_a_subpackage', 'path') + + def test_subpackage_status_not_installed(self): + self.assertEqual(self.pkg.subpackage_status('subpackage1', 'path')[0], + status.NOT_INSTALLED) + # Not installed with no path: still not installed. + self.assertEqual(self.pkg.subpackage_status('subpackage1')[0], + status.NOT_INSTALLED) + + def test_subpackage_status_installed(self): + self.setup_installed_subpackage('subpackage1') + self.assertEqual(self.pkg.subpackage_status('subpackage1', 'path')[0], + status.INSTALLED) + # Installed with no path: still installed. + self.assertEqual(self.pkg.subpackage_status('subpackage1')[0], + status.INSTALLED) + + + def test_subpackage_status_linked(self): + self.setup_linked_subpackage('subpackage1', 'link') + self.assertEqual(self.pkg.subpackage_status('subpackage1', 'link')[0], + status.LINKED) + # Linked with no path: just installed. + self.assertEqual(self.pkg.subpackage_status('subpackage1')[0], + status.INSTALLED) + + + def test_subpackage_status_missing(self): + self.setup_missing_subpackage() + self.assertEqual(self.pkg.subpackage_status('subpackage1', 'path')[0], + status.MISSING) + # Missing with no path: still missing. + self.assertEqual(self.pkg.subpackage_status('subpackage1')[0], + status.MISSING) + + def test_subpackage_status_broken_link(self): + self.setup_broken_link_subpackage('subpackage1', 'link') + self.assertEqual(self.pkg.subpackage_status('subpackage1', 'link')[0], + status.UNRECOGNIZED) + # Broken link because missing, with no path: just missing. + self.assertEqual(self.pkg.subpackage_status('subpackage1')[0], + status.MISSING) + + def test_subpackage_status_unknown(self): + self.setup_unknown_subpackage('subpackage1', '???') + self.assertEqual(self.pkg.subpackage_status('subpackage1', '???')[0], + status.UNRECOGNIZED) + # Unknown but installed, with no path: just installed. + self.assertEqual(self.pkg.subpackage_status('subpackage1')[0], + status.INSTALLED) + + def test_subpackage_status_incorrect_version(self): + self.setup_installed_subpackage('subpackage1') + self.pkg.version = 'different_version' + # Different versions are treated as completely different BSPs for now. + self.assertEqual(self.pkg.subpackage_status('subpackage1', 'path')[0], + status.NOT_INSTALLED) + # Still not installed with no path. + self.assertEqual(self.pkg.subpackage_status('subpackage1')[0], + status.NOT_INSTALLED) + + def test_tarball_version(self): + file1 = stubs.StubFile('file1') + self.stub_os.path.should_exist = ['file1'] + self.stub_open.files = {'file1': file1} + self.stub_hashlib.should_return = '<hash_of_file1>' + self.assertEqual(package.TarPackage.get_tarball_version('file1'), + '<hash_of_file1>') + + def test_tarball_invalid_version(self): + self.stub_os.path.should_exist = [] + with self.assertRaises(package.VersionError): + package.TarPackage.get_tarball_version('file1') + + def test_already_downloaded(self): + self.stub_os.path.should_be_dir = [self.pkg.directory] + # Shouldn't do any downloading - no dirs should be made. + self.stub_os.should_makedirs = [] + self.pkg.download() + + def test_download_exception(self): + # pkg is the abstract Package class, + # download isn't fully implemented + self.stub_os.should_makedirs = [self.pkg.directory] + with self.assertRaises(NotImplementedError): + self.pkg.download() + # downloaded failed, should clean up the download directory. + self.assertFalse(self.stub_os.path.exists(self.pkg.directory)) + + def test_git_download_shallow(self): + self.stub_os.should_makedirs = [self.git_package.directory] + # Note: we don't test that tmp_dir/.git exists, because that's + # hard to write cleanly; so we assume it is created and exists + # as expected by clone/rev-parse. + ls_remote = self.stub_subprocess.AddCommand(ret_out='hash') + clone = self.stub_subprocess.AddFileCommand(pos_out=[-1]) + rev_parse = self.stub_subprocess.AddCommand(ret_out='hash') - def test_git_failed_download(self): - self.stub_os.should_makedirs = [self.git_package.directory] - ls_remote = self.stub_subprocess.AddCommand(ret_out='hash') - clone = self.stub_subprocess.AddFileCommand(pos_out=[-1], - ret_code=-1) - - with self.assertRaises(package.DownloadError): - self.git_package.download() - # Leave in a clean state - self.assertFalse(self.stub_os.path.exists(self.git_package.directory)) - ls_remote.AssertCallContained(['git', 'ls-remote']) - clone.AssertCallContained(['git', 'clone']) - - def test_git_failed_reset(self): - self.stub_os.should_makedirs = [self.git_package.directory] - # Note: we don't test that tmp_dir/.git exists, because that's - # hard to write cleanly; so we assume it is created and exists - # as expected by clone/rev-parse. - ls_remote = self.stub_subprocess.AddCommand(ret_out='wronghash') - clone = self.stub_subprocess.AddFileCommand(pos_out=[-1]) - reset = self.stub_subprocess.AddCommand(ret_code=-1) - - with self.assertRaises(package.VersionError): self.git_package.download() - # Leave in a clean state - self.assertFalse(self.stub_os.path.exists(self.git_package.directory)) - ls_remote.AssertCallContained(['git', 'ls-remote']) - clone.AssertCallContained(['git', 'clone']) - reset.AssertCallContained(['git', 'reset']) - - def test_git_wrong_version(self): - self.stub_os.should_makedirs = [self.git_package.directory] - # Note: we don't test that tmp_dir/.git exists, because that's - # hard to write cleanly; so we assume it is created and exists - # as expected by clone/rev-parse. - ls_remote = self.stub_subprocess.AddCommand(ret_out='hash') - clone = self.stub_subprocess.AddFileCommand(pos_out=[-1]) - rev_parse = self.stub_subprocess.AddCommand(ret_out='wronghash') - - with self.assertRaises(package.VersionError): - self.git_package.download() - # Leave in a clean state - self.assertFalse(self.stub_os.path.exists(self.git_package.directory)) - ls_remote.AssertCallContained(['git', 'ls-remote']) - clone.AssertCallContained(['git', 'clone']) - rev_parse.AssertCallContained(['git', 'rev-parse']) - - def test_git_tarball(self): - self.stub_os.should_makedirs = [self.git_package.directory] - # Some sort of error about no tarballs for git packages - with self.assertRaisesRegexp(ValueError, 'tarball'): - self.git_package.download(tarball='file') - # Leave in a clean state - self.assertFalse(self.stub_os.path.exists(self.git_package.directory)) - - def test_tar_download(self): - self.stub_os.should_makedirs = [self.tar_package.directory] - curl = self.stub_subprocess.AddFileCommand(out_args=['-o']) - # Again, side effects of popen are hard to encode, so this doesn't - # reflect that the '-C' output directory will get populated by tar. - tar = self.stub_subprocess.AddFileCommand(in_args=['-C', '-xzf']) - self.stub_hashlib.should_return = self.tar_package.version - - self.tar_package.download() - self.assertTrue(self.stub_os.path.exists(self.tar_package.directory)) - curl.AssertCallContained(['curl', '-o']) - tar.AssertCallContained(['tar', '-C', '-xzf']) - - def test_tar_failed_download(self): - self.stub_os.should_makedirs = [self.tar_package.directory] - curl = self.stub_subprocess.AddFileCommand(out_args=['-o'], ret_code=-1) - self.stub_hashlib.should_return = self.tar_package.version - - with self.assertRaises(package.DownloadError): - self.tar_package.download() - # Leave in a clean state - self.assertFalse(self.stub_os.path.exists(self.tar_package.directory)) - curl.AssertCallContained(['curl', '-o']) + self.assertTrue(self.stub_os.path.exists(self.git_package.directory)) + ls_remote.AssertCallContained(['git', 'ls-remote']) + clone.AssertCallContained(['git', 'clone']) + rev_parse.AssertCallContained(['git', 'rev-parse']) + + def test_git_download_full(self): + self.stub_os.should_makedirs = [self.git_package.directory] + # Note: we don't test that tmp_dir/.git exists, because that's + # hard to write cleanly; so we assume it is created and exists + # as expected by clone/rev-parse. + ls_remote = self.stub_subprocess.AddCommand(ret_out='wronghash') + clone = self.stub_subprocess.AddFileCommand(pos_out=[-1]) + reset = self.stub_subprocess.AddCommand() + rev_parse = self.stub_subprocess.AddCommand(ret_out='hash') - def test_tar_wrong_version(self): - self.stub_os.should_makedirs = [self.tar_package.directory] - curl = self.stub_subprocess.AddFileCommand(out_args=['-o']) - self.stub_hashlib.should_return = '0xWRONG' + self.git_package.download() + self.assertTrue(self.stub_os.path.exists(self.git_package.directory)) + ls_remote.AssertCallContained(['git', 'ls-remote']) + clone.AssertCallContained(['git', 'clone']) + reset.AssertCallContained(['git', 'reset']) + rev_parse.AssertCallContained(['git', 'rev-parse']) + + def test_git_failed_lsremote(self): + self.stub_os.should_makedirs = [self.git_package.directory] + ls_remote = self.stub_subprocess.AddCommand(ret_code=-1) + + with self.assertRaises(package.RemoteError): + self.git_package.download() + # Leave in a clean state + self.assertFalse(self.stub_os.path.exists(self.git_package.directory)) + ls_remote.AssertCallContained(['git', 'ls-remote']) + + def test_git_failed_download(self): + self.stub_os.should_makedirs = [self.git_package.directory] + ls_remote = self.stub_subprocess.AddCommand(ret_out='hash') + clone = self.stub_subprocess.AddFileCommand(pos_out=[-1], + ret_code=-1) + + with self.assertRaises(package.DownloadError): + self.git_package.download() + # Leave in a clean state + self.assertFalse(self.stub_os.path.exists(self.git_package.directory)) + ls_remote.AssertCallContained(['git', 'ls-remote']) + clone.AssertCallContained(['git', 'clone']) + + def test_git_failed_reset(self): + self.stub_os.should_makedirs = [self.git_package.directory] + # Note: we don't test that tmp_dir/.git exists, because that's + # hard to write cleanly; so we assume it is created and exists + # as expected by clone/rev-parse. + ls_remote = self.stub_subprocess.AddCommand(ret_out='wronghash') + clone = self.stub_subprocess.AddFileCommand(pos_out=[-1]) + reset = self.stub_subprocess.AddCommand(ret_code=-1) + + with self.assertRaises(package.VersionError): + self.git_package.download() + # Leave in a clean state + self.assertFalse(self.stub_os.path.exists(self.git_package.directory)) + ls_remote.AssertCallContained(['git', 'ls-remote']) + clone.AssertCallContained(['git', 'clone']) + reset.AssertCallContained(['git', 'reset']) + + def test_git_wrong_version(self): + self.stub_os.should_makedirs = [self.git_package.directory] + # Note: we don't test that tmp_dir/.git exists, because that's + # hard to write cleanly; so we assume it is created and exists + # as expected by clone/rev-parse. + ls_remote = self.stub_subprocess.AddCommand(ret_out='hash') + clone = self.stub_subprocess.AddFileCommand(pos_out=[-1]) + rev_parse = self.stub_subprocess.AddCommand(ret_out='wronghash') + + with self.assertRaises(package.VersionError): + self.git_package.download() + # Leave in a clean state + self.assertFalse(self.stub_os.path.exists(self.git_package.directory)) + ls_remote.AssertCallContained(['git', 'ls-remote']) + clone.AssertCallContained(['git', 'clone']) + rev_parse.AssertCallContained(['git', 'rev-parse']) + + def test_git_tarball(self): + self.stub_os.should_makedirs = [self.git_package.directory] + # Some sort of error about no tarballs for git packages + with self.assertRaisesRegexp(ValueError, 'tarball'): + self.git_package.download(tarball='file') + # Leave in a clean state + self.assertFalse(self.stub_os.path.exists(self.git_package.directory)) + + def test_tar_download(self): + self.stub_os.should_makedirs = [self.tar_package.directory] + curl = self.stub_subprocess.AddFileCommand(out_args=['-o']) + # Again, side effects of popen are hard to encode, so this doesn't + # reflect that the '-C' output directory will get populated by tar. + tar = self.stub_subprocess.AddFileCommand(in_args=['-C', '-xzf']) + self.stub_hashlib.should_return = self.tar_package.version - with self.assertRaisesRegexp(package.VersionError, - 'incorrect version 0xWRONG'): - self.tar_package.download() - # Leave in a clean state - self.assertFalse(self.stub_os.path.exists(self.tar_package.directory)) - curl.AssertCallContained(['curl', '-o']) - - def test_tar_bad_unzip(self): - self.stub_os.should_makedirs = [self.tar_package.directory] - curl = self.stub_subprocess.AddFileCommand(out_args=['-o']) - tar = self.stub_subprocess.AddFileCommand(in_args=['-C', '-xzf'], - ret_code=-1) - self.stub_hashlib.should_return = self.tar_package.version - - with self.assertRaises(package.UnzipError): self.tar_package.download() - # Leave in a clean state - self.assertFalse(self.stub_os.path.exists(self.tar_package.directory)) - curl.AssertCallContained(['curl', '-o']) - tar.AssertCallContained(['tar', '-C', '-xzf']) - - def test_tar_tarball(self): - self.stub_os.path.should_exist = ['tar_file'] - self.stub_open.files['tar_file'] = stubs.StubFile() - self.stub_os.should_makedirs = [self.tar_package.directory] - tar = self.stub_subprocess.AddFileCommand(in_args=['-C', '-xzf']) - self.stub_hashlib.should_return = self.tar_package.version - - self.tar_package.download('tar_file') - self.assertTrue(self.stub_os.path.exists(self.tar_package.directory)) - # Make sure the tar file is still where it was. - self.assertTrue(self.stub_os.path.exists('tar_file')) - tar.AssertCallContained(['tar', '-C', '-xzf']) - - def test_uninstall(self): - dirs = [self.pkg.directory, - self.stub_os.path.join(self.pkg.directory, 'subdir1'), - self.stub_os.path.join(self.pkg.directory, 'subdir2'), - self.stub_os.path.join(self.pkg.directory, 'subdir2', 'deeper')] - self.stub_os.path.should_be_dir.extend(dirs) - self.stub_os.path.should_exist.extend(dirs) - files = [self.stub_os.path.join(dirname, 'a_file') for dirname in dirs] - self.stub_os.path.should_exist.extend(files) - - self.assertTrue(self.pkg.is_downloaded()) - self.pkg.uninstall() - for f in files: - self.assertFalse(self.stub_os.path.exists(f)) - for d in dirs: - self.assertFalse(self.stub_os.path.exists(d)) - self.assertFalse(self.stub_os.path.isdir(d)) - self.assertFalse(self.pkg.is_downloaded()) - - def test_get_licenses(self): - licenses = self.pkg.get_licenses(['subpackage1', 'subpackage2']) - expected_names = ['name.subpackage1 - license1', - 'name.subpackage1 - license2', - 'name.subpackage2 - license2', - 'name.subpackage2 - license3'] - - for l in licenses: - self.assertTrue(l.name in expected_names) - expected_names.remove(l.name) - self.assertEqual(expected_names, []) - - def test_get_some_licenses(self): - licenses = self.pkg.get_licenses(['subpackage2']) - expected_names = ['name.subpackage2 - license2', - 'name.subpackage2 - license3'] - - for l in licenses: - self.assertTrue(l.name in expected_names) - expected_names.remove(l.name) - self.assertEqual(expected_names, []) - - def test_get_no_licenses(self): - licenses = self.pkg.get_licenses([]) - self.assertEqual(licenses, []) - - def test_link_subpackage(self): - _, subpackage_ = self.pkg.subpackages.items()[0] - subpackage_dir = self.stub_os.path.join(self.pkg.directory, - subpackage_.subdir) - self.stub_os.path.should_be_dir = [self.pkg.directory, subpackage_dir] - self.stub_os.path.should_exist.append(subpackage_dir) - self.stub_os.should_create_link = [(subpackage_dir, 'link_origin')] - - self.pkg.link_subpackage('subpackage1', 'link_origin') - # Correct link should be created. - self.assertTrue(self.stub_os.path.islink('link_origin')) - self.assertEqual(self.stub_os.readlink('link_origin'), - subpackage_dir) - - def test_link_not_downloaded(self): - with self.assertRaises(package.NotDownloadedError): - self.pkg.link_subpackage('subpackage1', 'link_origin') - # No link should be created. - self.assertFalse(self.stub_os.path.exists('link_origin')) - - def test_link_missing(self): - self.stub_os.path.should_be_dir = [self.pkg.directory] - - with self.assertRaises(package.MissingDirectoryError): - self.pkg.link_subpackage('subpackage1', 'link_origin') - # No link should be created. - self.assertFalse(self.stub_os.path.exists('link_origin')) - - def test_link_fail(self): - _, subpackage_ = self.pkg.subpackages.items()[0] - subpackage_dir = self.stub_os.path.join(self.pkg.directory, - subpackage_.subdir) - self.stub_os.path.should_be_dir = [self.pkg.directory, subpackage_dir] - self.stub_os.path.should_exist.append(subpackage_dir) - - with self.assertRaises(package.SubpackageLinkError): - self.pkg.link_subpackage('subpackage1', 'link_origin') - # No link should be created. - self.assertFalse(self.stub_os.path.exists('link_origin')) - - def test_link_overwrite(self): - (_, subpackage_) = self.pkg.subpackages.items()[0] - subpackage_dir = self.stub_os.path.join(self.pkg.directory, - subpackage_.subdir) - self.stub_os.path.should_be_dir = [self.pkg.directory, subpackage_dir] - self.stub_os.path.should_exist += [subpackage_dir, 'link_origin'] - - # Should still be a link error. The caller needs to check for this. - with self.assertRaises(package.SubpackageLinkError): - self.pkg.link_subpackage('subpackage1', 'link_origin') - # No link should be created, the file should still be there. - self.assertTrue(self.stub_os.path.exists('link_origin')) - self.assertFalse(self.stub_os.path.islink('link_origin')) - - def test_unlink(self): - _, subpackage_ = self.pkg.subpackages.items()[0] - subpackage_dir = self.stub_os.path.join(self.pkg.directory, - subpackage_.subdir) - self.stub_os.path.should_be_dir = [self.pkg.directory, subpackage_dir] - self.stub_os.path.should_exist += [subpackage_dir, 'link_origin'] - self.stub_os.path.should_link['link_origin'] = subpackage_dir - - self.pkg.unlink_subpackage('subpackage1', 'link_origin') - # Should be unlinked. - self.assertFalse(self.stub_os.path.islink('link_origin')) - self.assertFalse(self.stub_os.path.exists('link_origin')) - - def test_unlink_wrong_link(self): - _, subpackage_ = self.pkg.subpackages.items()[0] - subpackage_dir = self.stub_os.path.join(self.pkg.directory, - subpackage_.subdir) - self.stub_os.path.should_be_dir = [self.pkg.directory, subpackage_dir] - self.stub_os.path.should_exist += [subpackage_dir, 'link_origin', - 'somewhere_else'] - self.stub_os.path.should_link['link_origin'] = 'somewhere_else' - - with self.assertRaises(package.SubpackageUnlinkError): - self.pkg.unlink_subpackage('subpackage1', 'link_origin') - # Original link should be in place. - self.assertTrue(self.stub_os.path.exists('link_origin')) - self.assertTrue(self.stub_os.path.islink('link_origin')) - self.assertEqual(self.stub_os.readlink('link_origin'), - 'somewhere_else') - - def test_unlink_no_link(self): - _, subpackage_ = self.pkg.subpackages.items()[0] - subpackage_dir = self.stub_os.path.join(self.pkg.directory, - subpackage_.subdir) - self.stub_os.path.should_be_dir = [self.pkg.directory, subpackage_dir] - self.stub_os.path.should_exist += [subpackage_dir, 'link_origin'] - - with self.assertRaises(package.SubpackageUnlinkError): - self.pkg.unlink_subpackage('subpackage1', 'link_origin') - # Original lack of link should be in place. - self.assertTrue(self.stub_os.path.exists('link_origin')) - self.assertFalse(self.stub_os.path.islink('link_origin')) - - def test_unlink_dead_link(self): - _, subpackage_ = self.pkg.subpackages.items()[0] - subpackage_dir = self.stub_os.path.join(self.pkg.directory, - subpackage_.subdir) - self.stub_os.path.should_be_dir = [self.pkg.directory, subpackage_dir] - # Dead links don't exist for os.path purposes. - self.stub_os.path.should_exist += [subpackage_dir] - self.stub_os.path.should_link['link_origin'] = 'somewhere_else' - - with self.assertRaises(package.SubpackageUnlinkError): - self.pkg.unlink_subpackage('subpackage1', 'link_origin') - # Original dead link should be in place. - self.assertTrue(self.stub_os.path.islink('link_origin')) - self.assertEqual(self.stub_os.readlink('link_origin'), - 'somewhere_else') - - def test_unlink_nothing(self): - _, subpackage_ = self.pkg.subpackages.items()[0] - subpackage_dir = self.stub_os.path.join(self.pkg.directory, - subpackage_.subdir) - self.stub_os.path.should_be_dir = [self.pkg.directory, subpackage_dir] - self.stub_os.path.should_exist += [subpackage_dir] - - with self.assertRaises(package.SubpackageUnlinkError): - self.pkg.unlink_subpackage('subpackage1', 'link_origin') - # Should still be nothing. - self.assertFalse(self.stub_os.path.islink('link_origin')) - self.assertFalse(self.stub_os.path.exists('link_origin')) + self.assertTrue(self.stub_os.path.exists(self.tar_package.directory)) + curl.AssertCallContained(['curl', '-o']) + tar.AssertCallContained(['tar', '-C', '-xzf']) + + def test_tar_failed_download(self): + self.stub_os.should_makedirs = [self.tar_package.directory] + curl = self.stub_subprocess.AddFileCommand(out_args=['-o'], ret_code=-1) + self.stub_hashlib.should_return = self.tar_package.version + + with self.assertRaises(package.DownloadError): + self.tar_package.download() + # Leave in a clean state + self.assertFalse(self.stub_os.path.exists(self.tar_package.directory)) + curl.AssertCallContained(['curl', '-o']) + + def test_tar_wrong_version(self): + self.stub_os.should_makedirs = [self.tar_package.directory] + curl = self.stub_subprocess.AddFileCommand(out_args=['-o']) + self.stub_hashlib.should_return = '0xWRONG' + + with self.assertRaisesRegexp(package.VersionError, + 'incorrect version 0xWRONG'): + self.tar_package.download() + # Leave in a clean state + self.assertFalse(self.stub_os.path.exists(self.tar_package.directory)) + curl.AssertCallContained(['curl', '-o']) + + def test_tar_bad_unzip(self): + self.stub_os.should_makedirs = [self.tar_package.directory] + curl = self.stub_subprocess.AddFileCommand(out_args=['-o']) + tar = self.stub_subprocess.AddFileCommand(in_args=['-C', '-xzf'], + ret_code=-1) + self.stub_hashlib.should_return = self.tar_package.version + + with self.assertRaises(package.UnzipError): + self.tar_package.download() + # Leave in a clean state + self.assertFalse(self.stub_os.path.exists(self.tar_package.directory)) + curl.AssertCallContained(['curl', '-o']) + tar.AssertCallContained(['tar', '-C', '-xzf']) + + def test_tar_tarball(self): + self.stub_os.path.should_exist = ['tar_file'] + self.stub_open.files['tar_file'] = stubs.StubFile() + self.stub_os.should_makedirs = [self.tar_package.directory] + tar = self.stub_subprocess.AddFileCommand(in_args=['-C', '-xzf']) + self.stub_hashlib.should_return = self.tar_package.version + + self.tar_package.download('tar_file') + self.assertTrue(self.stub_os.path.exists(self.tar_package.directory)) + # Make sure the tar file is still where it was. + self.assertTrue(self.stub_os.path.exists('tar_file')) + tar.AssertCallContained(['tar', '-C', '-xzf']) + + def test_uninstall(self): + dirs = [self.pkg.directory, + self.stub_os.path.join(self.pkg.directory, 'subdir1'), + self.stub_os.path.join(self.pkg.directory, 'subdir2'), + self.stub_os.path.join(self.pkg.directory, 'subdir2', + 'deeper')] + self.stub_os.path.should_be_dir.extend(dirs) + self.stub_os.path.should_exist.extend(dirs) + files = [self.stub_os.path.join(dirname, 'a_file') for dirname in dirs] + self.stub_os.path.should_exist.extend(files) + + self.assertTrue(self.pkg.is_downloaded()) + self.pkg.uninstall() + for f in files: + self.assertFalse(self.stub_os.path.exists(f)) + for d in dirs: + self.assertFalse(self.stub_os.path.exists(d)) + self.assertFalse(self.stub_os.path.isdir(d)) + self.assertFalse(self.pkg.is_downloaded()) + + def test_get_licenses(self): + licenses = self.pkg.get_licenses(['subpackage1', 'subpackage2']) + expected_names = ['name.subpackage1 - license1', + 'name.subpackage1 - license2', + 'name.subpackage2 - license2', + 'name.subpackage2 - license3'] + + for l in licenses: + self.assertTrue(l.name in expected_names) + expected_names.remove(l.name) + self.assertEqual(expected_names, []) + + def test_get_some_licenses(self): + licenses = self.pkg.get_licenses(['subpackage2']) + expected_names = ['name.subpackage2 - license2', + 'name.subpackage2 - license3'] + + for l in licenses: + self.assertTrue(l.name in expected_names) + expected_names.remove(l.name) + self.assertEqual(expected_names, []) + + def test_get_no_licenses(self): + licenses = self.pkg.get_licenses([]) + self.assertEqual(licenses, []) + + def test_link_subpackage(self): + _, subpackage_ = self.pkg.subpackages.items()[0] + subpackage_dir = self.stub_os.path.join(self.pkg.directory, + subpackage_.subdir) + self.stub_os.path.should_be_dir = [self.pkg.directory, subpackage_dir] + self.stub_os.path.should_exist.append(subpackage_dir) + self.stub_os.should_create_link = [(subpackage_dir, 'link_origin')] + + self.pkg.link_subpackage('subpackage1', 'link_origin') + # Correct link should be created. + self.assertTrue(self.stub_os.path.islink('link_origin')) + self.assertEqual(self.stub_os.readlink('link_origin'), + subpackage_dir) + + def test_link_not_downloaded(self): + with self.assertRaises(package.NotDownloadedError): + self.pkg.link_subpackage('subpackage1', 'link_origin') + # No link should be created. + self.assertFalse(self.stub_os.path.exists('link_origin')) + + def test_link_missing(self): + self.stub_os.path.should_be_dir = [self.pkg.directory] + + with self.assertRaises(package.MissingDirectoryError): + self.pkg.link_subpackage('subpackage1', 'link_origin') + # No link should be created. + self.assertFalse(self.stub_os.path.exists('link_origin')) + + def test_link_fail(self): + _, subpackage_ = self.pkg.subpackages.items()[0] + subpackage_dir = self.stub_os.path.join(self.pkg.directory, + subpackage_.subdir) + self.stub_os.path.should_be_dir = [self.pkg.directory, subpackage_dir] + self.stub_os.path.should_exist.append(subpackage_dir) + + with self.assertRaises(package.SubpackageLinkError): + self.pkg.link_subpackage('subpackage1', 'link_origin') + # No link should be created. + self.assertFalse(self.stub_os.path.exists('link_origin')) + + def test_link_overwrite(self): + (_, subpackage_) = self.pkg.subpackages.items()[0] + subpackage_dir = self.stub_os.path.join(self.pkg.directory, + subpackage_.subdir) + self.stub_os.path.should_be_dir = [self.pkg.directory, subpackage_dir] + self.stub_os.path.should_exist += [subpackage_dir, 'link_origin'] + + # Should still be a link error. The caller needs to check for this. + with self.assertRaises(package.SubpackageLinkError): + self.pkg.link_subpackage('subpackage1', 'link_origin') + # No link should be created, the file should still be there. + self.assertTrue(self.stub_os.path.exists('link_origin')) + self.assertFalse(self.stub_os.path.islink('link_origin')) + + def test_unlink(self): + _, subpackage_ = self.pkg.subpackages.items()[0] + subpackage_dir = self.stub_os.path.join(self.pkg.directory, + subpackage_.subdir) + self.stub_os.path.should_be_dir = [self.pkg.directory, subpackage_dir] + self.stub_os.path.should_exist += [subpackage_dir, 'link_origin'] + self.stub_os.path.should_link['link_origin'] = subpackage_dir + + self.pkg.unlink_subpackage('subpackage1', 'link_origin') + # Should be unlinked. + self.assertFalse(self.stub_os.path.islink('link_origin')) + self.assertFalse(self.stub_os.path.exists('link_origin')) + + def test_unlink_wrong_link(self): + _, subpackage_ = self.pkg.subpackages.items()[0] + subpackage_dir = self.stub_os.path.join(self.pkg.directory, + subpackage_.subdir) + self.stub_os.path.should_be_dir = [self.pkg.directory, subpackage_dir] + self.stub_os.path.should_exist += [subpackage_dir, 'link_origin', + 'somewhere_else'] + self.stub_os.path.should_link['link_origin'] = 'somewhere_else' + + with self.assertRaises(package.SubpackageUnlinkError): + self.pkg.unlink_subpackage('subpackage1', 'link_origin') + # Original link should be in place. + self.assertTrue(self.stub_os.path.exists('link_origin')) + self.assertTrue(self.stub_os.path.islink('link_origin')) + self.assertEqual(self.stub_os.readlink('link_origin'), + 'somewhere_else') + + def test_unlink_no_link(self): + _, subpackage_ = self.pkg.subpackages.items()[0] + subpackage_dir = self.stub_os.path.join(self.pkg.directory, + subpackage_.subdir) + self.stub_os.path.should_be_dir = [self.pkg.directory, subpackage_dir] + self.stub_os.path.should_exist += [subpackage_dir, 'link_origin'] + + with self.assertRaises(package.SubpackageUnlinkError): + self.pkg.unlink_subpackage('subpackage1', 'link_origin') + # Original lack of link should be in place. + self.assertTrue(self.stub_os.path.exists('link_origin')) + self.assertFalse(self.stub_os.path.islink('link_origin')) + + def test_unlink_dead_link(self): + _, subpackage_ = self.pkg.subpackages.items()[0] + subpackage_dir = self.stub_os.path.join(self.pkg.directory, + subpackage_.subdir) + self.stub_os.path.should_be_dir = [self.pkg.directory, subpackage_dir] + # Dead links don't exist for os.path purposes. + self.stub_os.path.should_exist += [subpackage_dir] + self.stub_os.path.should_link['link_origin'] = 'somewhere_else' + + with self.assertRaises(package.SubpackageUnlinkError): + self.pkg.unlink_subpackage('subpackage1', 'link_origin') + # Original dead link should be in place. + self.assertTrue(self.stub_os.path.islink('link_origin')) + self.assertEqual(self.stub_os.readlink('link_origin'), + 'somewhere_else') + + def test_unlink_nothing(self): + _, subpackage_ = self.pkg.subpackages.items()[0] + subpackage_dir = self.stub_os.path.join(self.pkg.directory, + subpackage_.subdir) + self.stub_os.path.should_be_dir = [self.pkg.directory, subpackage_dir] + self.stub_os.path.should_exist += [subpackage_dir] + + with self.assertRaises(package.SubpackageUnlinkError): + self.pkg.unlink_subpackage('subpackage1', 'link_origin') + # Should still be nothing. + self.assertFalse(self.stub_os.path.islink('link_origin')) + self.assertFalse(self.stub_os.path.exists('link_origin')) diff --git a/cli/lib/bsp/status.py b/cli/lib/bsp/status.py index 26466cf..60e2703 100644 --- a/cli/lib/bsp/status.py +++ b/cli/lib/bsp/status.py @@ -27,16 +27,16 @@ NOT_INSTALLED = 4 def status_string(status): - """Human readable version of a status.""" - if status == INSTALLED: - return "Installed" - elif status == LINKED: - return "Linked in OS tree" - elif status == UNRECOGNIZED: - return "Unrecognized" - elif status == MISSING: - return "Files missing from package" - elif status == NOT_INSTALLED: - return "Not Installed" - else: - return "Unknown State" + """Human readable version of a status.""" + if status == INSTALLED: + return "Installed" + elif status == LINKED: + return "Linked in OS tree" + elif status == UNRECOGNIZED: + return "Unrecognized" + elif status == MISSING: + return "Files missing from package" + elif status == NOT_INSTALLED: + return "Not Installed" + else: + return "Unknown State" diff --git a/cli/lib/bsp/subpackage.py b/cli/lib/bsp/subpackage.py index 6efa211..e850007 100644 --- a/cli/lib/bsp/subpackage.py +++ b/cli/lib/bsp/subpackage.py @@ -21,32 +21,32 @@ SUBPACKAGE_KEY_LICENSES = 'licenses' class Subpackage(object): - """Class for subpackages. + """Class for subpackages. - Subpackages are just dumb data. + Subpackages are just dumb data. - Attributes: - name: the name of the subpackage. - subdir: the subdir of the remote package to extract. - licenses: list of license file paths, relative to subdir. - """ - - def __init__(self, name, subdir, licenses): - self.name = name - self.subdir = subdir - self.licenses = licenses - - @classmethod - def from_dict(cls, dic, name): - """Create a SubPackage from a dict. - - Args: - dic: the dictionary to build the subpackage from. - name: the name of the subpackage. - - Returns: - A subpackage named |name| generated from |dic|. + Attributes: + name: the name of the subpackage. + subdir: the subdir of the remote package to extract. + licenses: list of license file paths, relative to subdir. """ - return cls(name, - dic[SUBPACKAGE_KEY_SUBDIR], - dic[SUBPACKAGE_KEY_LICENSES]) + + def __init__(self, name, subdir, licenses): + self.name = name + self.subdir = subdir + self.licenses = licenses + + @classmethod + def from_dict(cls, dic, name): + """Create a SubPackage from a dict. + + Args: + dic: the dictionary to build the subpackage from. + name: the name of the subpackage. + + Returns: + A subpackage named |name| generated from |dic|. + """ + return cls(name, + dic[SUBPACKAGE_KEY_SUBDIR], + dic[SUBPACKAGE_KEY_LICENSES]) diff --git a/cli/lib/bsp/subpackage_unittest.py b/cli/lib/bsp/subpackage_unittest.py index 4af6a12..e9dcedd 100644 --- a/cli/lib/bsp/subpackage_unittest.py +++ b/cli/lib/bsp/subpackage_unittest.py @@ -21,31 +21,33 @@ import unittest from bsp import subpackage class SubpackageTest(unittest.TestCase): - def setUp(self): - self.base_subpackage = { - 'subdir': 'path/to/subdir', - 'licenses': ['license1', 'license2'] - } - - - def test_from_dict(self): - subpkg = subpackage.Subpackage.from_dict(self.base_subpackage, 'subpackage') - self.assertIsInstance(subpkg, subpackage.Subpackage) - self.assertEqual(subpkg.name, 'subpackage') - self.assertEqual(subpkg.subdir, 'path/to/subdir') - self.assertEqual(subpkg.licenses, ['license1', 'license2']) - - def test_from_dict_noisy(self): - noisy_subpackage = self.base_subpackage - noisy_subpackage['not_a_real_key'] = 'not a real value' - subpkg = subpackage.Subpackage.from_dict(noisy_subpackage, 'subpackage') - self.assertIsInstance(subpkg, subpackage.Subpackage) - self.assertEqual(subpkg.name, 'subpackage') - self.assertEqual(subpkg.subdir, 'path/to/subdir') - self.assertEqual(subpkg.licenses, ['license1', 'license2']) - - def test_from_dict_incomplete(self): - incomplete_subpackage = self.base_subpackage - del incomplete_subpackage['subdir'] - self.assertRaisesRegexp(KeyError, 'subdir', subpackage.Subpackage.from_dict, - incomplete_subpackage, 'subpackage') + def setUp(self): + self.base_subpackage = { + 'subdir': 'path/to/subdir', + 'licenses': ['license1', 'license2'] + } + + + def test_from_dict(self): + subpkg = subpackage.Subpackage.from_dict(self.base_subpackage, + 'subpackage') + self.assertIsInstance(subpkg, subpackage.Subpackage) + self.assertEqual(subpkg.name, 'subpackage') + self.assertEqual(subpkg.subdir, 'path/to/subdir') + self.assertEqual(subpkg.licenses, ['license1', 'license2']) + + def test_from_dict_noisy(self): + noisy_subpackage = self.base_subpackage + noisy_subpackage['not_a_real_key'] = 'not a real value' + subpkg = subpackage.Subpackage.from_dict(noisy_subpackage, 'subpackage') + self.assertIsInstance(subpkg, subpackage.Subpackage) + self.assertEqual(subpkg.name, 'subpackage') + self.assertEqual(subpkg.subdir, 'path/to/subdir') + self.assertEqual(subpkg.licenses, ['license1', 'license2']) + + def test_from_dict_incomplete(self): + incomplete_subpackage = self.base_subpackage + del incomplete_subpackage['subdir'] + self.assertRaisesRegexp(KeyError, 'subdir', + subpackage.Subpackage.from_dict, + incomplete_subpackage, 'subpackage') |