diff options
author | Sen Jiang <senj@google.com> | 2018-09-24 15:13:29 -0700 |
---|---|---|
committer | Sen Jiang <senj@google.com> | 2018-09-24 15:23:07 -0700 |
commit | 99229a1f152493924cb04d949952cc909a7f4378 (patch) | |
tree | a5986186fb80208eec329485870b264f7807129b | |
parent | 0bee58501fed842c2435b09e466211d5936d9e93 (diff) | |
download | tools-99229a1f152493924cb04d949952cc909a7f4378.tar.gz |
Add emmc_image.py
Test: None
Change-Id: I361d160017efef6bc8e3a8c2f2405bdf844c1177
-rw-r--r-- | README.md | 27 | ||||
-rwxr-xr-x | emmc_image.py | 247 | ||||
-rwxr-xr-x | emmc_image_unittest.py | 205 |
3 files changed, 479 insertions, 0 deletions
diff --git a/README.md b/README.md new file mode 100644 index 0000000..c995872 --- /dev/null +++ b/README.md @@ -0,0 +1,27 @@ +emmc_image.py is a tool to generate an eMMC USER image. + +Requirement: + +* Python 2.7 +* simg2img + * Used to unsparse Android sparse images + * Either make it available in `PATH` or set `SIMG2IMG_BIN` to the path of + the `simg2img` binary + + +**Factory image downloaded from console in both slots:** + +``` +./emmc_image.py --out emmc.zip --factory_files console_build.zip --all-slots +``` + + +**Custom images in slot A, factory image downloaded from console in slot B:** + +``` +./emmc_image.py --out emmc.zip --factory_files console_build.zip \ +--all-slots \ +--partition boot_a custom_boot.img \ +--partition system_a custom_system.img \ +--partition vbmeta_a custom_vbmeta.img +``` diff --git a/emmc_image.py b/emmc_image.py new file mode 100755 index 0000000..2ae34be --- /dev/null +++ b/emmc_image.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python +# +# Copyright 2017 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os +import struct +import shutil +import subprocess +import sys +import tempfile +import zipfile + + +class PackedTuple(object): + """A base class for a named tuple packed into a binary string.""" + + def __init__(self, members, data=None): + """Initialize the instance with a given set of members. + Args: + members: The list of members of the tuple, as a pair of strings: struct + encoding and member name. + """ + self._ptuple_fmt = '<' + ''.join(fldfmt for fldfmt, _ in members) + self._ptuple_flds = [fld for _, fld in members] + self._ptuple_size = struct.calcsize(self._ptuple_fmt) + + values = ([None for _ in members] if data is None + else struct.unpack(self._ptuple_fmt, data[:self._ptuple_size])) + for value, fld in zip(values, self._ptuple_flds): + setattr(self, fld, value) + + def __len__(self): + return self._ptuple_size + + def __str__(self): + return struct.pack(self._ptuple_fmt, + *(getattr(self, fld) for fld in self._ptuple_flds)) + + def __repr__(self): + return '<%s ' % (type(self).__name__) + ' '.join( + '%s=%r' % (fld, getattr(self, fld)) for fld in self._ptuple_flds) + ' >' + + +class GPTPartitionEntry(PackedTuple): + """A packed tuple representing a GPT partition entry.""" + + def __init__(self, data): + members = ( + ('16s', 'guid'), + ('16s', 'uuid'), + ('Q', 'first_lba'), + ('Q', 'last_lba'), + ('Q', 'flags'), + ('72s', 'name'), + ) + super(GPTPartitionEntry, self).__init__(members, data) + if data is None: + self.guid = '\0' * 16 + self.uuid = '\0' * 16 + self.name = '' + + +class GPTPartitionTable(PackedTuple): + """A packed tuple representing the header of a GPT partition.""" + + def __init__(self, data): + members = ( + ('8s', 'signature'), + ('I', 'revision'), + ('I', 'header_size'), + ('I', 'crc32'), + ('4s', '_pad'), + ('Q', 'current_lba'), + ('Q', 'backup_lba'), + ('Q', 'first_usable_lba'), + ('Q', 'last_usable_lba'), + ('16s', 'disk_guid'), + ('Q', 'part_entry_start_lba'), + ('I', 'num_part_entries'), + ('I', 'part_entry_size'), + ('I', 'crc32_part_array'), + ) + super(GPTPartitionTable, self).__init__(members, data) + if data is None: + self.current_lba = 1 + + +def SparseImageExtract(image_file): + """Return a temporary file with the RAW image from the sparse image in + |image_file|. + + If |image_file| isn't an Android sparse image returns None instead. The + temporary file will be deleted once closed. + """ + SPARSE_HEADER_FMT = '<I4H' + header_size = struct.calcsize(SPARSE_HEADER_FMT) + try: + # magic, major_version, minor_version, file_hdr_sz, chunk_hdr_sz + header = struct.unpack(SPARSE_HEADER_FMT, image_file.read(header_size)) + image_file.seek(-header_size, os.SEEK_CUR) + # This is the only combination supported, so we used it to identify sparse + # image files. + if header != (0xED26FF3A, 1, 0, 28, 12): + return + except IOError: + pass + + temp_file = tempfile.NamedTemporaryFile(dir=os.path.dirname(image_file.name)) + simg2img_bin = os.environ.get('SIMG2IMG_BIN', 'simg2img') + subprocess.check_call([simg2img_bin, image_file.name, temp_file.name]) + return temp_file + + +def WritePartition(out_file, part_file, start_offset): + out_file.seek(start_offset); + # Autodetect sparse images if possible. + raw_file = SparseImageExtract(part_file) + source_file = raw_file if raw_file else part_file + shutil.copyfileobj(source_file, out_file) + return source_file.tell() + + +def ExtractZips(zips, file_name): + for zip_path in zips: + if not zip_path: + continue + with zipfile.ZipFile(zip_path, 'r') as zip_file: + if file_name not in zip_file.namelist(): + continue + with zip_file.open(file_name, 'r') as part_file: + temp_file = tempfile.NamedTemporaryFile() + shutil.copyfileobj(part_file, temp_file) + temp_file.seek(0) + return temp_file + + +def GenerateEMMC(args): + """Assemble an EMMC raw image.""" + if args.partition_table: + with open(args.partition_table, 'rb') as gpt_file: + partition_table = gpt_file.read() + else: + gpt_file = ExtractZips(args.tfzips, 'IMAGES/partition-table.img') + + if not gpt_file: + gpt_file = ExtractZips([args.factory_files], + 'partition-table.img') + + assert gpt_file, 'No partition_table provided' + partition_table = gpt_file.read() + + gpt = GPTPartitionTable(partition_table[512:]) + gpt_partitions = {} + for i in range(gpt.num_part_entries): + offset = gpt.part_entry_start_lba * 512 + i * gpt.part_entry_size + part = GPTPartitionEntry(partition_table[offset:]) + part_name = str(part.name.decode('utf-16le').rstrip('\0')) + if part_name: + if part_name in gpt_partitions: + print 'Ignoring duplicate partition entry "%s"' % part_name + else: + gpt_partitions[part_name] = part + + gpt_head = partition_table[:gpt.first_usable_lba * 512] + gpt_tail = partition_table[gpt.first_usable_lba * 512:] + gpt_tail_offset = (gpt.last_usable_lba + 1) * 512 + + out_file = tempfile.NamedTemporaryFile() + print("Creating image: %s" % out_file.name) + out_file.write(gpt_head) + out_file.seek(gpt_tail_offset) + out_file.write(gpt_tail) + + partition_overrides = {} + if args.partition: + partition_overrides = {name: path for name, path in args.partition} + for part_name, part in gpt_partitions.items(): + part_offset = part.first_lba * 512 + part_size = (part.last_lba - part.first_lba + 1) * 512 + + if part_name.endswith('_a') or part_name.endswith('_b'): + if not args.all_slots and part_name.endswith('_b'): + continue + part_name_no_suffix = part_name[:-2] + else: + part_name_no_suffix = part_name + + if part_name in partition_overrides: + part_file = open(partition_overrides[part_name], 'rb') + elif part_name_no_suffix in partition_overrides: + part_file = open(partition_overrides[part_name_no_suffix], 'rb') + elif part_name == 'userdata': + part_file = None + else: + part_file = ExtractZips(args.tfzips, + 'IMAGES/%s.img' % part_name_no_suffix) + if not part_file: + part_file = ExtractZips([args.factory_files], + '%s.img' % part_name_no_suffix) + if not part_file: + print ' Skipping partition "%s", no file provided' % (part_name,) + continue + + print(' Copying partition "%s"' % (part_name)) + copied_size = WritePartition(out_file, part_file, part_offset) + part_file.close() + assert copied_size <= part_size, \ + 'Partition %s overflow; size is %d KiB but copied %d KiB' % ( + part_name, part_size / 1024, copied_size / 1024) + print(' Partition "%s", copied size: %d KiB at offset %d KiB' % + (part_name, copied_size / 1024, part_offset / 1024)) + + out_file.flush() + with zipfile.ZipFile(args.out, 'w', zipfile.ZIP_DEFLATED, True) as out_zip: + print("Zipping image: %s" % args.out) + out_zip.write(out_file.name, 'emmc.img') + return 0 + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description=GenerateEMMC.__doc__) + parser.add_argument('--out', metavar='OUTPUT', type=str, required=True, + help='The output zipped emmc image.') + parser.add_argument('--partition_table', help='path to the MBR+GPT image.') + parser.add_argument('--partition', nargs=2, metavar=('PARTITION_NAME','PATH'), + action='append', help='override partition images.') + parser.add_argument('--factory_files', + help='path to the factory_files or flashfiles zip') + parser.add_argument('tfzips', nargs='*', metavar='TARGET_FILES', + help='path to target_files zip(s)') + parser.add_argument('--all-slots', default=False, action='store_true', + help='copy the provided images to all slots') + + sys.exit(GenerateEMMC(parser.parse_args())) diff --git a/emmc_image_unittest.py b/emmc_image_unittest.py new file mode 100755 index 0000000..8664d3f --- /dev/null +++ b/emmc_image_unittest.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python +# +# Copyright 2017 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import subprocess +import tempfile +import unittest +import zipfile + +import emmc_image + + +class GPTPartitionTableTest(unittest.TestCase): + + def testGPTPartitionTable(self): + gpt_header = ('EFI PART' + '\x00\x00\x01\x00' + '\\\x00\x00\x00' + '\x88R\xe9\x92' + '\x00\x00\x00\x00' + '\x01\x00\x00\x00\x00\x00\x00\x00' + '\xff\xff\x7f\x00\x00\x00\x00\x00' + '"\x00\x00\x00\x00\x00\x00\x00' + '\xde\xff\x7f\x00\x00\x00\x00\x00' + '\xdc\xdd0\x7f-WVF\xa3\x81\x96W\xfd\xe5\\L' + '\x02\x00\x00\x00\x00\x00\x00\x00' + '\x80\x00\x00\x00' + '\x80\x00\x00\x00' + '\x93J\xf4]') + gpt = emmc_image.GPTPartitionTable(gpt_header) + self.assertEqual(gpt.signature, 'EFI PART') + self.assertEqual(gpt.revision, 65536) + self.assertEqual(gpt.header_size, len(gpt_header)) + self.assertEqual(gpt.crc32, 0x92e95288) + self.assertEqual(gpt.current_lba, 1) + self.assertEqual(gpt.backup_lba, 0x7fffff) + self.assertEqual(gpt.first_usable_lba, 34) + self.assertEqual(gpt.last_usable_lba, 0x7fffde) + self.assertEqual(gpt.disk_guid, '\xdc\xdd0\x7f-WVF\xa3\x81\x96W\xfd\xe5\\L') + self.assertEqual(gpt.part_entry_start_lba, 2) + self.assertEqual(gpt.num_part_entries, 128) + self.assertEqual(gpt.part_entry_size, 128) + self.assertEqual(gpt.crc32_part_array, 0x5df44a93) + + +class GenerateEMMCTest(unittest.TestCase): + + def setUp(self): + bpt = ''' + { + "settings": { + "disk_size": "1 MiB" + }, + "partitions": [ + { + "ab": true, + "label": "boot", + "size": "10 KiB" + }, + { + "ab": true, + "label": "system", + "size": "20 KiB" + }, + { + "label": "misc", + "size": "1 KiB" + } + ] + }''' + + bpt_file = tempfile.NamedTemporaryFile() + bpt_file.write(bpt) + bpt_file.flush() + self.gpt_file = tempfile.NamedTemporaryFile() + subprocess.check_call([ + 'bpttool', 'make_table', '--input', bpt_file.name, '--output_gpt', + self.gpt_file.name + ]) + + boot_file = tempfile.NamedTemporaryFile() + self.boot = os.urandom(8 * 1024) + boot_file.write(self.boot) + boot_file.flush() + + zip_file = tempfile.NamedTemporaryFile() + self.misc = os.urandom(300) + with zipfile.ZipFile(zip_file.name, 'w') as factory_files_zip: + factory_files_zip.writestr('misc.img', self.misc) + self.out_file = tempfile.NamedTemporaryFile() + + class FakeArgs(): + + def __init__(self): + self.partition_table = '' + self.partition = [['boot', boot_file.name]] + self.factory_files = zip_file.name + self.tfzips = [] + self.out = '' + self.all_slots = False + + self.args = FakeArgs() + self.args.partition_table = self.gpt_file.name + self.args.out = self.out_file.name + + + def testGenerateEMMC(self): + self.assertEqual(emmc_image.GenerateEMMC(self.args), 0) + + out_zip = zipfile.ZipFile(self.out_file.name, 'r') + emmc_file = out_zip.open('emmc.img', 'r') + emmc = emmc_file.read() + self.assertEqual(len(emmc), 1024 * 1024) + + self.assertEqual(emmc[:17 * 1024], self.gpt_file.read(17 * 1024)) + boot_offset = 20 * 1024 + boot_part_size = 10 * 1024 + self.assertEqual(emmc[boot_offset:boot_offset + len(self.boot)], self.boot) + # rest of boot_a should be 0 + self.assertEqual( + emmc[boot_offset + len(self.boot):boot_offset + boot_part_size], + '\0' * (boot_part_size - len(self.boot))) + # padding between boot_a and boot_b should be 0 + self.assertEqual(emmc[30 * 1024:32 * 1024], '\0' * 2 * 1024) + # unwritten boot_b should be 0 + self.assertEqual(emmc[32 * 1024:42 * 1024], '\0' * 10 * 1024) + # padding between boot_b and system_a should be 0 + self.assertEqual(emmc[30 * 1024:32 * 1024], '\0' * 2 * 1024) + # unwritten system_a should be 0 + self.assertEqual(emmc[44 * 1024:64 * 1024], '\0' * 20 * 1024) + # unwritten system_b should be 0 + self.assertEqual(emmc[64 * 1024:84 * 1024], '\0' * 20 * 1024) + misc_offset = 84 * 1024 + misc_part_size = 1024 + self.assertEqual(emmc[misc_offset:misc_offset + len(self.misc)], self.misc) + # rest of misc should be 0 + self.assertEqual( + emmc[misc_offset + len(self.misc):misc_offset + misc_part_size], + '\0' * (misc_part_size - len(self.misc))) + # secondary gpt + self.assertEqual(emmc[-33 * 512:], self.gpt_file.read(33 * 512)) + + def testParitionOverride(self): + boot_b_file = tempfile.NamedTemporaryFile() + boot_b = os.urandom(9 * 1024) + boot_b_file.write(boot_b) + boot_b_file.flush() + + self.args.partition += [['boot_b', boot_b_file.name]] + self.args.all_slots = True + self.assertEqual(emmc_image.GenerateEMMC(self.args), 0) + + out_zip = zipfile.ZipFile(self.out_file.name, 'r') + emmc_file = out_zip.open('emmc.img', 'r') + emmc = emmc_file.read() + self.assertEqual(len(emmc), 1024 * 1024) + + self.assertEqual(emmc[:17 * 1024], self.gpt_file.read(17 * 1024)) + boot_offset = 20 * 1024 + boot_part_size = 10 * 1024 + self.assertEqual(emmc[boot_offset:boot_offset + len(self.boot)], self.boot) + # rest of boot_a should be 0 + self.assertEqual( + emmc[boot_offset + len(self.boot):boot_offset + boot_part_size], + '\0' * (boot_part_size - len(self.boot))) + # padding between boot_a and boot_b should be 0 + self.assertEqual(emmc[30 * 1024:32 * 1024], '\0' * 2 * 1024) + boot_b_offset = 32 * 1024 + self.assertEqual(emmc[boot_b_offset:boot_b_offset + len(boot_b)], boot_b) + # rest of boot_b should be 0 + self.assertEqual( + emmc[boot_b_offset + len(boot_b):boot_b_offset + boot_part_size], + '\0' * (boot_part_size - len(boot_b))) + # padding between boot_b and system_a should be 0 + self.assertEqual(emmc[30 * 1024:32 * 1024], '\0' * 2 * 1024) + # unwritten system_a should be 0 + self.assertEqual(emmc[44 * 1024:64 * 1024], '\0' * 20 * 1024) + # unwritten system_b should be 0 + self.assertEqual(emmc[64 * 1024:84 * 1024], '\0' * 20 * 1024) + misc_offset = 84 * 1024 + misc_part_size = 1024 + self.assertEqual(emmc[misc_offset:misc_offset + len(self.misc)], self.misc) + # rest of misc should be 0 + self.assertEqual( + emmc[misc_offset + len(self.misc):misc_offset + misc_part_size], + '\0' * (misc_part_size - len(self.misc))) + # secondary gpt + self.assertEqual(emmc[-33 * 512:], self.gpt_file.read(33 * 512)) + + +if __name__ == '__main__': + unittest.main() |