aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSen Jiang <senj@google.com>2018-09-24 15:13:29 -0700
committerSen Jiang <senj@google.com>2018-09-24 15:23:07 -0700
commit99229a1f152493924cb04d949952cc909a7f4378 (patch)
treea5986186fb80208eec329485870b264f7807129b
parent0bee58501fed842c2435b09e466211d5936d9e93 (diff)
downloadtools-99229a1f152493924cb04d949952cc909a7f4378.tar.gz
Add emmc_image.py
Test: None Change-Id: I361d160017efef6bc8e3a8c2f2405bdf844c1177
-rw-r--r--README.md27
-rwxr-xr-xemmc_image.py247
-rwxr-xr-xemmc_image_unittest.py205
3 files changed, 479 insertions, 0 deletions
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..c995872
--- /dev/null
+++ b/README.md
@@ -0,0 +1,27 @@
+emmc_image.py is a tool to generate an eMMC USER image.
+
+Requirement:
+
+* Python 2.7
+* simg2img
+ * Used to unsparse Android sparse images
+ * Either make it available in `PATH` or set `SIMG2IMG_BIN` to the path of
+ the `simg2img` binary
+
+
+**Factory image downloaded from console in both slots:**
+
+```
+./emmc_image.py --out emmc.zip --factory_files console_build.zip --all-slots
+```
+
+
+**Custom images in slot A, factory image downloaded from console in slot B:**
+
+```
+./emmc_image.py --out emmc.zip --factory_files console_build.zip \
+--all-slots \
+--partition boot_a custom_boot.img \
+--partition system_a custom_system.img \
+--partition vbmeta_a custom_vbmeta.img
+```
diff --git a/emmc_image.py b/emmc_image.py
new file mode 100755
index 0000000..2ae34be
--- /dev/null
+++ b/emmc_image.py
@@ -0,0 +1,247 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import os
+import struct
+import shutil
+import subprocess
+import sys
+import tempfile
+import zipfile
+
+
+class PackedTuple(object):
+ """A base class for a named tuple packed into a binary string."""
+
+ def __init__(self, members, data=None):
+ """Initialize the instance with a given set of members.
+ Args:
+ members: The list of members of the tuple, as a pair of strings: struct
+ encoding and member name.
+ """
+ self._ptuple_fmt = '<' + ''.join(fldfmt for fldfmt, _ in members)
+ self._ptuple_flds = [fld for _, fld in members]
+ self._ptuple_size = struct.calcsize(self._ptuple_fmt)
+
+ values = ([None for _ in members] if data is None
+ else struct.unpack(self._ptuple_fmt, data[:self._ptuple_size]))
+ for value, fld in zip(values, self._ptuple_flds):
+ setattr(self, fld, value)
+
+ def __len__(self):
+ return self._ptuple_size
+
+ def __str__(self):
+ return struct.pack(self._ptuple_fmt,
+ *(getattr(self, fld) for fld in self._ptuple_flds))
+
+ def __repr__(self):
+ return '<%s ' % (type(self).__name__) + ' '.join(
+ '%s=%r' % (fld, getattr(self, fld)) for fld in self._ptuple_flds) + ' >'
+
+
+class GPTPartitionEntry(PackedTuple):
+ """A packed tuple representing a GPT partition entry."""
+
+ def __init__(self, data):
+ members = (
+ ('16s', 'guid'),
+ ('16s', 'uuid'),
+ ('Q', 'first_lba'),
+ ('Q', 'last_lba'),
+ ('Q', 'flags'),
+ ('72s', 'name'),
+ )
+ super(GPTPartitionEntry, self).__init__(members, data)
+ if data is None:
+ self.guid = '\0' * 16
+ self.uuid = '\0' * 16
+ self.name = ''
+
+
+class GPTPartitionTable(PackedTuple):
+ """A packed tuple representing the header of a GPT partition."""
+
+ def __init__(self, data):
+ members = (
+ ('8s', 'signature'),
+ ('I', 'revision'),
+ ('I', 'header_size'),
+ ('I', 'crc32'),
+ ('4s', '_pad'),
+ ('Q', 'current_lba'),
+ ('Q', 'backup_lba'),
+ ('Q', 'first_usable_lba'),
+ ('Q', 'last_usable_lba'),
+ ('16s', 'disk_guid'),
+ ('Q', 'part_entry_start_lba'),
+ ('I', 'num_part_entries'),
+ ('I', 'part_entry_size'),
+ ('I', 'crc32_part_array'),
+ )
+ super(GPTPartitionTable, self).__init__(members, data)
+ if data is None:
+ self.current_lba = 1
+
+
+def SparseImageExtract(image_file):
+ """Return a temporary file with the RAW image from the sparse image in
+ |image_file|.
+
+ If |image_file| isn't an Android sparse image returns None instead. The
+ temporary file will be deleted once closed.
+ """
+ SPARSE_HEADER_FMT = '<I4H'
+ header_size = struct.calcsize(SPARSE_HEADER_FMT)
+ try:
+ # magic, major_version, minor_version, file_hdr_sz, chunk_hdr_sz
+ header = struct.unpack(SPARSE_HEADER_FMT, image_file.read(header_size))
+ image_file.seek(-header_size, os.SEEK_CUR)
+ # This is the only combination supported, so we used it to identify sparse
+ # image files.
+ if header != (0xED26FF3A, 1, 0, 28, 12):
+ return
+ except IOError:
+ pass
+
+ temp_file = tempfile.NamedTemporaryFile(dir=os.path.dirname(image_file.name))
+ simg2img_bin = os.environ.get('SIMG2IMG_BIN', 'simg2img')
+ subprocess.check_call([simg2img_bin, image_file.name, temp_file.name])
+ return temp_file
+
+
+def WritePartition(out_file, part_file, start_offset):
+ out_file.seek(start_offset);
+ # Autodetect sparse images if possible.
+ raw_file = SparseImageExtract(part_file)
+ source_file = raw_file if raw_file else part_file
+ shutil.copyfileobj(source_file, out_file)
+ return source_file.tell()
+
+
+def ExtractZips(zips, file_name):
+ for zip_path in zips:
+ if not zip_path:
+ continue
+ with zipfile.ZipFile(zip_path, 'r') as zip_file:
+ if file_name not in zip_file.namelist():
+ continue
+ with zip_file.open(file_name, 'r') as part_file:
+ temp_file = tempfile.NamedTemporaryFile()
+ shutil.copyfileobj(part_file, temp_file)
+ temp_file.seek(0)
+ return temp_file
+
+
+def GenerateEMMC(args):
+ """Assemble an EMMC raw image."""
+ if args.partition_table:
+ with open(args.partition_table, 'rb') as gpt_file:
+ partition_table = gpt_file.read()
+ else:
+ gpt_file = ExtractZips(args.tfzips, 'IMAGES/partition-table.img')
+
+ if not gpt_file:
+ gpt_file = ExtractZips([args.factory_files],
+ 'partition-table.img')
+
+ assert gpt_file, 'No partition_table provided'
+ partition_table = gpt_file.read()
+
+ gpt = GPTPartitionTable(partition_table[512:])
+ gpt_partitions = {}
+ for i in range(gpt.num_part_entries):
+ offset = gpt.part_entry_start_lba * 512 + i * gpt.part_entry_size
+ part = GPTPartitionEntry(partition_table[offset:])
+ part_name = str(part.name.decode('utf-16le').rstrip('\0'))
+ if part_name:
+ if part_name in gpt_partitions:
+ print 'Ignoring duplicate partition entry "%s"' % part_name
+ else:
+ gpt_partitions[part_name] = part
+
+ gpt_head = partition_table[:gpt.first_usable_lba * 512]
+ gpt_tail = partition_table[gpt.first_usable_lba * 512:]
+ gpt_tail_offset = (gpt.last_usable_lba + 1) * 512
+
+ out_file = tempfile.NamedTemporaryFile()
+ print("Creating image: %s" % out_file.name)
+ out_file.write(gpt_head)
+ out_file.seek(gpt_tail_offset)
+ out_file.write(gpt_tail)
+
+ partition_overrides = {}
+ if args.partition:
+ partition_overrides = {name: path for name, path in args.partition}
+ for part_name, part in gpt_partitions.items():
+ part_offset = part.first_lba * 512
+ part_size = (part.last_lba - part.first_lba + 1) * 512
+
+ if part_name.endswith('_a') or part_name.endswith('_b'):
+ if not args.all_slots and part_name.endswith('_b'):
+ continue
+ part_name_no_suffix = part_name[:-2]
+ else:
+ part_name_no_suffix = part_name
+
+ if part_name in partition_overrides:
+ part_file = open(partition_overrides[part_name], 'rb')
+ elif part_name_no_suffix in partition_overrides:
+ part_file = open(partition_overrides[part_name_no_suffix], 'rb')
+ elif part_name == 'userdata':
+ part_file = None
+ else:
+ part_file = ExtractZips(args.tfzips,
+ 'IMAGES/%s.img' % part_name_no_suffix)
+ if not part_file:
+ part_file = ExtractZips([args.factory_files],
+ '%s.img' % part_name_no_suffix)
+ if not part_file:
+ print ' Skipping partition "%s", no file provided' % (part_name,)
+ continue
+
+ print(' Copying partition "%s"' % (part_name))
+ copied_size = WritePartition(out_file, part_file, part_offset)
+ part_file.close()
+ assert copied_size <= part_size, \
+ 'Partition %s overflow; size is %d KiB but copied %d KiB' % (
+ part_name, part_size / 1024, copied_size / 1024)
+ print(' Partition "%s", copied size: %d KiB at offset %d KiB' %
+ (part_name, copied_size / 1024, part_offset / 1024))
+
+ out_file.flush()
+ with zipfile.ZipFile(args.out, 'w', zipfile.ZIP_DEFLATED, True) as out_zip:
+ print("Zipping image: %s" % args.out)
+ out_zip.write(out_file.name, 'emmc.img')
+ return 0
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(description=GenerateEMMC.__doc__)
+ parser.add_argument('--out', metavar='OUTPUT', type=str, required=True,
+ help='The output zipped emmc image.')
+ parser.add_argument('--partition_table', help='path to the MBR+GPT image.')
+ parser.add_argument('--partition', nargs=2, metavar=('PARTITION_NAME','PATH'),
+ action='append', help='override partition images.')
+ parser.add_argument('--factory_files',
+ help='path to the factory_files or flashfiles zip')
+ parser.add_argument('tfzips', nargs='*', metavar='TARGET_FILES',
+ help='path to target_files zip(s)')
+ parser.add_argument('--all-slots', default=False, action='store_true',
+ help='copy the provided images to all slots')
+
+ sys.exit(GenerateEMMC(parser.parse_args()))
diff --git a/emmc_image_unittest.py b/emmc_image_unittest.py
new file mode 100755
index 0000000..8664d3f
--- /dev/null
+++ b/emmc_image_unittest.py
@@ -0,0 +1,205 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import subprocess
+import tempfile
+import unittest
+import zipfile
+
+import emmc_image
+
+
+class GPTPartitionTableTest(unittest.TestCase):
+
+ def testGPTPartitionTable(self):
+ gpt_header = ('EFI PART'
+ '\x00\x00\x01\x00'
+ '\\\x00\x00\x00'
+ '\x88R\xe9\x92'
+ '\x00\x00\x00\x00'
+ '\x01\x00\x00\x00\x00\x00\x00\x00'
+ '\xff\xff\x7f\x00\x00\x00\x00\x00'
+ '"\x00\x00\x00\x00\x00\x00\x00'
+ '\xde\xff\x7f\x00\x00\x00\x00\x00'
+ '\xdc\xdd0\x7f-WVF\xa3\x81\x96W\xfd\xe5\\L'
+ '\x02\x00\x00\x00\x00\x00\x00\x00'
+ '\x80\x00\x00\x00'
+ '\x80\x00\x00\x00'
+ '\x93J\xf4]')
+ gpt = emmc_image.GPTPartitionTable(gpt_header)
+ self.assertEqual(gpt.signature, 'EFI PART')
+ self.assertEqual(gpt.revision, 65536)
+ self.assertEqual(gpt.header_size, len(gpt_header))
+ self.assertEqual(gpt.crc32, 0x92e95288)
+ self.assertEqual(gpt.current_lba, 1)
+ self.assertEqual(gpt.backup_lba, 0x7fffff)
+ self.assertEqual(gpt.first_usable_lba, 34)
+ self.assertEqual(gpt.last_usable_lba, 0x7fffde)
+ self.assertEqual(gpt.disk_guid, '\xdc\xdd0\x7f-WVF\xa3\x81\x96W\xfd\xe5\\L')
+ self.assertEqual(gpt.part_entry_start_lba, 2)
+ self.assertEqual(gpt.num_part_entries, 128)
+ self.assertEqual(gpt.part_entry_size, 128)
+ self.assertEqual(gpt.crc32_part_array, 0x5df44a93)
+
+
+class GenerateEMMCTest(unittest.TestCase):
+
+ def setUp(self):
+ bpt = '''
+ {
+ "settings": {
+ "disk_size": "1 MiB"
+ },
+ "partitions": [
+ {
+ "ab": true,
+ "label": "boot",
+ "size": "10 KiB"
+ },
+ {
+ "ab": true,
+ "label": "system",
+ "size": "20 KiB"
+ },
+ {
+ "label": "misc",
+ "size": "1 KiB"
+ }
+ ]
+ }'''
+
+ bpt_file = tempfile.NamedTemporaryFile()
+ bpt_file.write(bpt)
+ bpt_file.flush()
+ self.gpt_file = tempfile.NamedTemporaryFile()
+ subprocess.check_call([
+ 'bpttool', 'make_table', '--input', bpt_file.name, '--output_gpt',
+ self.gpt_file.name
+ ])
+
+ boot_file = tempfile.NamedTemporaryFile()
+ self.boot = os.urandom(8 * 1024)
+ boot_file.write(self.boot)
+ boot_file.flush()
+
+ zip_file = tempfile.NamedTemporaryFile()
+ self.misc = os.urandom(300)
+ with zipfile.ZipFile(zip_file.name, 'w') as factory_files_zip:
+ factory_files_zip.writestr('misc.img', self.misc)
+ self.out_file = tempfile.NamedTemporaryFile()
+
+ class FakeArgs():
+
+ def __init__(self):
+ self.partition_table = ''
+ self.partition = [['boot', boot_file.name]]
+ self.factory_files = zip_file.name
+ self.tfzips = []
+ self.out = ''
+ self.all_slots = False
+
+ self.args = FakeArgs()
+ self.args.partition_table = self.gpt_file.name
+ self.args.out = self.out_file.name
+
+
+ def testGenerateEMMC(self):
+ self.assertEqual(emmc_image.GenerateEMMC(self.args), 0)
+
+ out_zip = zipfile.ZipFile(self.out_file.name, 'r')
+ emmc_file = out_zip.open('emmc.img', 'r')
+ emmc = emmc_file.read()
+ self.assertEqual(len(emmc), 1024 * 1024)
+
+ self.assertEqual(emmc[:17 * 1024], self.gpt_file.read(17 * 1024))
+ boot_offset = 20 * 1024
+ boot_part_size = 10 * 1024
+ self.assertEqual(emmc[boot_offset:boot_offset + len(self.boot)], self.boot)
+ # rest of boot_a should be 0
+ self.assertEqual(
+ emmc[boot_offset + len(self.boot):boot_offset + boot_part_size],
+ '\0' * (boot_part_size - len(self.boot)))
+ # padding between boot_a and boot_b should be 0
+ self.assertEqual(emmc[30 * 1024:32 * 1024], '\0' * 2 * 1024)
+ # unwritten boot_b should be 0
+ self.assertEqual(emmc[32 * 1024:42 * 1024], '\0' * 10 * 1024)
+ # padding between boot_b and system_a should be 0
+ self.assertEqual(emmc[30 * 1024:32 * 1024], '\0' * 2 * 1024)
+ # unwritten system_a should be 0
+ self.assertEqual(emmc[44 * 1024:64 * 1024], '\0' * 20 * 1024)
+ # unwritten system_b should be 0
+ self.assertEqual(emmc[64 * 1024:84 * 1024], '\0' * 20 * 1024)
+ misc_offset = 84 * 1024
+ misc_part_size = 1024
+ self.assertEqual(emmc[misc_offset:misc_offset + len(self.misc)], self.misc)
+ # rest of misc should be 0
+ self.assertEqual(
+ emmc[misc_offset + len(self.misc):misc_offset + misc_part_size],
+ '\0' * (misc_part_size - len(self.misc)))
+ # secondary gpt
+ self.assertEqual(emmc[-33 * 512:], self.gpt_file.read(33 * 512))
+
+ def testParitionOverride(self):
+ boot_b_file = tempfile.NamedTemporaryFile()
+ boot_b = os.urandom(9 * 1024)
+ boot_b_file.write(boot_b)
+ boot_b_file.flush()
+
+ self.args.partition += [['boot_b', boot_b_file.name]]
+ self.args.all_slots = True
+ self.assertEqual(emmc_image.GenerateEMMC(self.args), 0)
+
+ out_zip = zipfile.ZipFile(self.out_file.name, 'r')
+ emmc_file = out_zip.open('emmc.img', 'r')
+ emmc = emmc_file.read()
+ self.assertEqual(len(emmc), 1024 * 1024)
+
+ self.assertEqual(emmc[:17 * 1024], self.gpt_file.read(17 * 1024))
+ boot_offset = 20 * 1024
+ boot_part_size = 10 * 1024
+ self.assertEqual(emmc[boot_offset:boot_offset + len(self.boot)], self.boot)
+ # rest of boot_a should be 0
+ self.assertEqual(
+ emmc[boot_offset + len(self.boot):boot_offset + boot_part_size],
+ '\0' * (boot_part_size - len(self.boot)))
+ # padding between boot_a and boot_b should be 0
+ self.assertEqual(emmc[30 * 1024:32 * 1024], '\0' * 2 * 1024)
+ boot_b_offset = 32 * 1024
+ self.assertEqual(emmc[boot_b_offset:boot_b_offset + len(boot_b)], boot_b)
+ # rest of boot_b should be 0
+ self.assertEqual(
+ emmc[boot_b_offset + len(boot_b):boot_b_offset + boot_part_size],
+ '\0' * (boot_part_size - len(boot_b)))
+ # padding between boot_b and system_a should be 0
+ self.assertEqual(emmc[30 * 1024:32 * 1024], '\0' * 2 * 1024)
+ # unwritten system_a should be 0
+ self.assertEqual(emmc[44 * 1024:64 * 1024], '\0' * 20 * 1024)
+ # unwritten system_b should be 0
+ self.assertEqual(emmc[64 * 1024:84 * 1024], '\0' * 20 * 1024)
+ misc_offset = 84 * 1024
+ misc_part_size = 1024
+ self.assertEqual(emmc[misc_offset:misc_offset + len(self.misc)], self.misc)
+ # rest of misc should be 0
+ self.assertEqual(
+ emmc[misc_offset + len(self.misc):misc_offset + misc_part_size],
+ '\0' * (misc_part_size - len(self.misc)))
+ # secondary gpt
+ self.assertEqual(emmc[-33 * 512:], self.gpt_file.read(33 * 512))
+
+
+if __name__ == '__main__':
+ unittest.main()