diff options
author | wbond <will@wbond.net> | 2018-04-18 06:28:53 -0400 |
---|---|---|
committer | wbond <will@wbond.net> | 2018-04-18 06:28:53 -0400 |
commit | b7784cdd53f25ae4f1560d82ea87cf228aea3852 (patch) | |
tree | c402b8ca289e68f6139a5c81e8598477e330ca67 /dev | |
parent | 09a03a09f8859085050466129ea3dbfaaeaa77eb (diff) | |
download | asn1crypto-b7784cdd53f25ae4f1560d82ea87cf228aea3852.tar.gz |
Updated deps.py
Diffstat (limited to 'dev')
-rw-r--r-- | dev/deps.py | 222 |
1 files changed, 179 insertions, 43 deletions
diff --git a/dev/deps.py b/dev/deps.py index 3340a66..8b06fc7 100644 --- a/dev/deps.py +++ b/dev/deps.py @@ -277,6 +277,182 @@ def _pep425tags(): return tags +def _open_archive(path): + """ + :param path: + A unicode string of the filesystem path to the archive + + :return: + An archive object + """ + + if path.endswith('.zip'): + return zipfile.ZipFile(path, 'r') + return tarfile.open(path, 'r') + + +def _list_archive_members(archive): + """ + :param archive: + An archive from _open_archive() + + :return: + A list of info objects to be used with _info_name() and _extract_info() + """ + + if isinstance(archive, zipfile.ZipFile): + return archive.infolist() + return archive.getmembers() + + +def _archive_single_dir(archive): + """ + Check if all members of the archive are in a single top-level directory + + :param archive: + An archive from _open_archive() + + :return: + None if not a single top level directory in archive, otherwise a + unicode string of the top level directory name + """ + + common_root = None + for info in _list_archive_members(archive): + fn = _info_name(info) + if fn in set(['.', '/']): + continue + sep = None + if '/' in fn: + sep = '/' + elif '\\' in fn: + sep = '\\' + if sep is None: + root_dir = fn + else: + root_dir, _ = fn.split(sep, 1) + if common_root is None: + common_root = root_dir + else: + if common_root != root_dir: + return None + return common_root + + +def _info_name(info): + """ + Returns a normalized file path for an archive info object + + :param info: + An info object from _list_archive_members() + + :return: + A unicode string with all directory separators normalized to "/" + """ + + if isinstance(info, zipfile.ZipInfo): + return info.filename.replace('\\', '/') + return info.name.replace('\\', '/') + + +def _extract_info(archive, info): + """ + Extracts the contents of an archive info object + + ;param archive: + An archive from _open_archive() + + :param info: + An info object from _list_archive_members() + + :return: + None, or a byte string of the file contents + """ + + if isinstance(archive, zipfile.ZipFile): + fn = info.filename + is_dir = fn.endswith('/') or fn.endswith('\\') + out = archive.read(info) + if is_dir and out == b'': + return None + return out + + info_file = archive.extractfile(info) + if info_file: + return info_file.read() + return None + + +def _extract_package(deps_dir, pkg_path): + """ + Extract a .whl, .zip, .tar.gz or .tar.bz2 into a package path to + use when running CI tasks + + :param deps_dir: + A unicode string of the directory the package should be extracted to + + :param pkg_path: + A unicode string of the path to the archive + """ + + if pkg_path.endswith('.whl'): + try: + zf = None + zf = zipfile.ZipFile(pkg_path, 'r') + # Wheels contain exactly what we need and nothing else + zf.extractall(deps_dir) + finally: + if zf: + zf.close() + return + + # Source archives may contain a bunch of other things. + # The following code works for the packages coverage and + # configparser, which are the two we currently require that + # do not provide wheels + + try: + ar = None + ar = _open_archive(pkg_path) + + pkg_name = None + base_path = _archive_single_dir(ar) or '' + if len(base_path): + if '-' in base_path: + pkg_name, _ = base_path.split('-', 1) + base_path += '/' + + base_pkg_path = None + if pkg_name is not None: + base_pkg_path = base_path + pkg_name + '/' + src_path = base_path + 'src/' + + members = [] + for info in _list_archive_members(ar): + fn = _info_name(info) + if base_pkg_path is not None and fn.startswith(base_pkg_path): + dst_path = fn[len(base_pkg_path) - len(pkg_name) - 1:] + members.append((info, dst_path)) + continue + if fn.startswith(src_path): + members.append((info, fn[len(src_path):])) + continue + + for info, path in members: + info_data = _extract_info(ar, info) + # Dirs won't return a file + if info_data is not None: + dst_path = os.path.join(deps_dir, path) + dst_dir = os.path.dirname(dst_path) + if not os.path.exists(dst_dir): + os.makedirs(dst_dir) + with open(dst_path, 'wb') as f: + f.write(info_data) + finally: + if ar: + ar.close() + + def _stage_requirements(deps_dir, path): """ Installs requirements without using Python to download, since @@ -296,7 +472,7 @@ def _stage_requirements(deps_dir, path): for p in packages: pkg = p['pkg'] if p['type'] == 'url': - if pkg.endswith('.zip') or pkg.endswith('.tar.gz') or pkg.endswith('.whl'): + if pkg.endswith('.zip') or pkg.endswith('.tar.gz') or pkg.endswith('.tar.bz2') or pkg.endswith('.whl'): url = pkg else: raise Exception('Unable to install package from URL that is not an archive') @@ -352,48 +528,8 @@ def _stage_requirements(deps_dir, path): raise Exception('Unable to find suitable download for %s' % pkg) local_path = _download(url, deps_dir) - if whl: - try: - zf = None - zf = zipfile.ZipFile(local_path, 'r') - # Wheels contain exactly what we need and nothing else - zf.extractall(deps_dir) - finally: - if zf: - zf.close() - else: - try: - tf = None - tf = tarfile.open(local_path, 'r') - # .tar.bz2 and .tar.gz may contain a bunch of other things. - # The following code works for the packages coverage and - # configparser, which are the two we currently require that - # do not provide wheels - base_path = pkg + '-' + version + '/' - base_pkg_path = base_path + pkg + '/' - src_path = base_path + 'src/' - members = [] - for ti in tf.getmembers(): - fn = ti.name - if fn.startswith(base_pkg_path) or fn.startswith(base_pkg_path.replace('/', '\\')): - members.append((ti, fn[len(base_pkg_path) - len(pkg) - 1:].replace('\\', '/'))) - continue - if fn.startswith(src_path) or fn.startswith(src_path.replace('/', '\\')): - members.append((ti, fn[len(src_path):].replace('\\', '/'))) - continue - for ti, path in members: - mf = tf.extractfile(ti) - # Dirs won't return a file - if mf: - dst_path = os.path.join(deps_dir, path) - dst_dir = os.path.dirname(dst_path) - if not os.path.exists(dst_dir): - os.makedirs(dst_dir) - with open(dst_path, 'wb') as f: - f.write(mf.read()) - finally: - if tf: - tf.close() + + _extract_package(deps_dir, local_path) os.remove(local_path) |