diff options
Diffstat (limited to 'image_chromeos.py')
-rwxr-xr-x | image_chromeos.py | 904 |
1 files changed, 490 insertions, 414 deletions
diff --git a/image_chromeos.py b/image_chromeos.py index d5c404dd..36965d05 100755 --- a/image_chromeos.py +++ b/image_chromeos.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- # -# Copyright 2019 The Chromium OS Authors. All rights reserved. +# Copyright 2019 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. @@ -10,9 +10,8 @@ This script images a remote ChromeOS device with a specific image." """ -from __future__ import print_function -__author__ = 'asharif@google.com (Ahmad Sharif)' +__author__ = "asharif@google.com (Ahmad Sharif)" import argparse import filecmp @@ -31,462 +30,539 @@ from cros_utils import logger from cros_utils import misc from cros_utils.file_utils import FileUtils -checksum_file = '/usr/local/osimage_checksum_file' -lock_file = '/tmp/image_chromeos_lock/image_chromeos_lock' + +checksum_file = "/usr/local/osimage_checksum_file" +lock_file = "/tmp/image_chromeos_lock/image_chromeos_lock" def Usage(parser, message): - print('ERROR: %s' % message) - parser.print_help() - sys.exit(0) + print("ERROR: %s" % message) + parser.print_help() + sys.exit(0) def CheckForCrosFlash(chromeos_root, remote, log_level): - cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) + cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) - # Check to see if remote machine has cherrypy, ctypes - command = "python -c 'import cherrypy, ctypes'" - ret = cmd_executer.CrosRunCommand( - command, chromeos_root=chromeos_root, machine=remote) - logger.GetLogger().LogFatalIf( - ret == 255, 'Failed ssh to %s (for checking cherrypy)' % remote) - logger.GetLogger().LogFatalIf( - ret != 0, "Failed to find cherrypy or ctypes on remote '{}', " - 'cros flash cannot work.'.format(remote)) + # Check to see if remote machine has cherrypy, ctypes + command = "python -c 'import cherrypy, ctypes'" + ret = cmd_executer.CrosRunCommand( + command, chromeos_root=chromeos_root, machine=remote + ) + logger.GetLogger().LogFatalIf( + ret == 255, "Failed ssh to %s (for checking cherrypy)" % remote + ) + logger.GetLogger().LogFatalIf( + ret != 0, + "Failed to find cherrypy or ctypes on remote '{}', " + "cros flash cannot work.".format(remote), + ) def DisableCrosBeeps(chromeos_root, remote, log_level): - """Disable annoying chromebooks beeps after reboots.""" - cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) + """Disable annoying chromebooks beeps after reboots.""" + cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) - command = '/usr/share/vboot/bin/set_gbb_flags.sh 0x1' - logger.GetLogger().LogOutput('Trying to disable beeping.') + command = "/usr/share/vboot/bin/set_gbb_flags.sh 0x1" + logger.GetLogger().LogOutput("Trying to disable beeping.") - ret, o, _ = cmd_executer.CrosRunCommandWOutput( - command, chromeos_root=chromeos_root, machine=remote) - if ret != 0: - logger.GetLogger().LogOutput(o) - logger.GetLogger().LogOutput('Failed to disable beeps.') + ret, o, _ = cmd_executer.CrosRunCommandWOutput( + command, chromeos_root=chromeos_root, machine=remote + ) + if ret != 0: + logger.GetLogger().LogOutput(o) + logger.GetLogger().LogOutput("Failed to disable beeps.") def FindChromeOSImage(image_file, chromeos_root): - """Find path for ChromeOS image inside chroot. - - This function could be called with image paths that are either inside - or outside the chroot. In either case the path needs to be translated - to an real/absolute path inside the chroot. - Example input paths: - /usr/local/google/home/uname/chromeos/chroot/tmp/my-test-images/image - ~/trunk/src/build/images/board/latest/image - /tmp/peppy-release/R67-1235.0.0/image - - Corresponding example output paths: - /tmp/my-test-images/image - /home/uname/trunk/src/build/images/board/latest/image - /tmp/peppy-release/R67-1235.0,0/image - """ - - # Get the name of the user, for "/home/<user>" part of the path. - whoami = getpass.getuser() - # Get the full path for the chroot dir, including 'chroot' - real_chroot_dir = os.path.join(os.path.realpath(chromeos_root), 'chroot') - # Get the full path for the chromeos root, excluding 'chroot' - real_chromeos_root = os.path.realpath(chromeos_root) - - # If path name starts with real_chroot_dir, remove that piece, but assume - # the rest of the path is correct. - if image_file.find(real_chroot_dir) != -1: - chroot_image = image_file[len(real_chroot_dir):] - # If path name starts with chromeos_root, excluding 'chroot', replace the - # chromeos_root with the prefix: '/home/<username>/trunk'. - elif image_file.find(real_chromeos_root) != -1: - chroot_image = image_file[len(real_chromeos_root):] - chroot_image = '/home/%s/trunk%s' % (whoami, chroot_image) - # Else assume the path is already internal, so leave it alone. - else: - chroot_image = image_file - - return chroot_image - - -def DoImage(argv): - """Image ChromeOS.""" - - parser = argparse.ArgumentParser() - parser.add_argument( - '-c', - '--chromeos_root', - dest='chromeos_root', - help='Target directory for ChromeOS installation.') - parser.add_argument('-r', '--remote', dest='remote', help='Target device.') - parser.add_argument('-i', '--image', dest='image', help='Image binary file.') - parser.add_argument( - '-b', '--board', dest='board', help='Target board override.') - parser.add_argument( - '-f', - '--force', - dest='force', - action='store_true', - default=False, - help='Force an image even if it is non-test.') - parser.add_argument( - '-n', - '--no_lock', - dest='no_lock', - default=False, - action='store_true', - help='Do not attempt to lock remote before imaging. ' - 'This option should only be used in cases where the ' - 'exclusive lock has already been acquired (e.g. in ' - 'a script that calls this one).') - parser.add_argument( - '-l', - '--logging_level', - dest='log_level', - default='verbose', - help='Amount of logging to be used. Valid levels are ' - "'quiet', 'average', and 'verbose'.") - parser.add_argument('-a', '--image_args', dest='image_args') - - options = parser.parse_args(argv[1:]) - - if not options.log_level in command_executer.LOG_LEVEL: - Usage(parser, "--logging_level must be 'quiet', 'average' or 'verbose'") - else: - log_level = options.log_level - - # Common initializations - cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) - l = logger.GetLogger() - - if options.chromeos_root is None: - Usage(parser, '--chromeos_root must be set') - - if options.remote is None: - Usage(parser, '--remote must be set') - - options.chromeos_root = os.path.expanduser(options.chromeos_root) - - if options.board is None: - board = cmd_executer.CrosLearnBoard(options.chromeos_root, options.remote) - else: - board = options.board - - if options.image is None: - images_dir = misc.GetImageDir(options.chromeos_root, board) - image = os.path.join(images_dir, 'latest', 'chromiumos_test_image.bin') - if not os.path.exists(image): - image = os.path.join(images_dir, 'latest', 'chromiumos_image.bin') - is_xbuddy_image = False - else: - image = options.image - is_xbuddy_image = image.startswith('xbuddy://') - if not is_xbuddy_image: - image = os.path.expanduser(image) - - if not is_xbuddy_image: - image = os.path.realpath(image) - - if not os.path.exists(image) and not is_xbuddy_image: - Usage(parser, 'Image file: ' + image + ' does not exist!') - - try: - should_unlock = False - if not options.no_lock: - try: - _ = locks.AcquireLock( - list(options.remote.split()), options.chromeos_root) - should_unlock = True - except Exception as e: - raise RuntimeError('Error acquiring machine: %s' % str(e)) - - reimage = False - local_image = False - if not is_xbuddy_image: - local_image = True - image_checksum = FileUtils().Md5File(image, log_level=log_level) + """Find path for ChromeOS image inside chroot. + + This function could be called with image paths that are either inside + or outside the chroot. In either case the path needs to be translated + to an real/absolute path inside the chroot. + Example input paths: + /usr/local/google/home/uname/chromeos/chroot/tmp/my-test-images/image + ~/trunk/src/build/images/board/latest/image + /tmp/peppy-release/R67-1235.0.0/image + + Corresponding example output paths: + /tmp/my-test-images/image + /home/uname/trunk/src/build/images/board/latest/image + /tmp/peppy-release/R67-1235.0,0/image + """ + + # Get the name of the user, for "/home/<user>" part of the path. + whoami = getpass.getuser() + # Get the full path for the chroot dir, including 'chroot' + real_chroot_dir = os.path.join(os.path.realpath(chromeos_root), "chroot") + # Get the full path for the chromeos root, excluding 'chroot' + real_chromeos_root = os.path.realpath(chromeos_root) + + # If path name starts with real_chroot_dir, remove that piece, but assume + # the rest of the path is correct. + if image_file.find(real_chroot_dir) != -1: + chroot_image = image_file[len(real_chroot_dir) :] + # If path name starts with chromeos_root, excluding 'chroot', replace the + # chromeos_root with the prefix: '/home/<username>/trunk'. + elif image_file.find(real_chromeos_root) != -1: + chroot_image = image_file[len(real_chromeos_root) :] + chroot_image = "/home/%s/trunk%s" % (whoami, chroot_image) + # Else assume the path is already internal, so leave it alone. + else: + chroot_image = image_file - command = 'cat ' + checksum_file - ret, device_checksum, _ = cmd_executer.CrosRunCommandWOutput( - command, chromeos_root=options.chromeos_root, machine=options.remote) + return chroot_image - device_checksum = device_checksum.strip() - image_checksum = str(image_checksum) - l.LogOutput('Image checksum: ' + image_checksum) - l.LogOutput('Device checksum: ' + device_checksum) +def DoImage(argv): + """Image ChromeOS.""" + + parser = argparse.ArgumentParser() + parser.add_argument( + "-c", + "--chromeos_root", + dest="chromeos_root", + help="Target directory for ChromeOS installation.", + ) + parser.add_argument("-r", "--remote", dest="remote", help="Target device.") + parser.add_argument( + "-i", "--image", dest="image", help="Image binary file." + ) + parser.add_argument( + "-b", "--board", dest="board", help="Target board override." + ) + parser.add_argument( + "-f", + "--force", + dest="force", + action="store_true", + default=False, + help="Force an image even if it is non-test.", + ) + parser.add_argument( + "-n", + "--no_lock", + dest="no_lock", + default=False, + action="store_true", + help="Do not attempt to lock remote before imaging. " + "This option should only be used in cases where the " + "exclusive lock has already been acquired (e.g. in " + "a script that calls this one).", + ) + parser.add_argument( + "-l", + "--logging_level", + dest="log_level", + default="verbose", + help="Amount of logging to be used. Valid levels are " + "'quiet', 'average', and 'verbose'.", + ) + parser.add_argument("-a", "--image_args", dest="image_args") + + options = parser.parse_args(argv[1:]) + + if not options.log_level in command_executer.LOG_LEVEL: + Usage(parser, "--logging_level must be 'quiet', 'average' or 'verbose'") + else: + log_level = options.log_level - if image_checksum != device_checksum: - [found, located_image] = LocateOrCopyImage( - options.chromeos_root, image, board=board) + # Common initializations + cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) + l = logger.GetLogger() - reimage = True - l.LogOutput('Checksums do not match. Re-imaging...') + if options.chromeos_root is None: + Usage(parser, "--chromeos_root must be set") - chroot_image = FindChromeOSImage(located_image, options.chromeos_root) + if options.remote is None: + Usage(parser, "--remote must be set") - is_test_image = IsImageModdedForTest(options.chromeos_root, - chroot_image, log_level) + options.chromeos_root = os.path.expanduser(options.chromeos_root) - if not is_test_image and not options.force: - logger.GetLogger().LogFatal('Have to pass --force to image a ' - 'non-test image!') + if options.board is None: + board = cmd_executer.CrosLearnBoard( + options.chromeos_root, options.remote + ) else: - reimage = True - found = True - l.LogOutput('Using non-local image; Re-imaging...') - - if reimage: - # If the device has /tmp mounted as noexec, image_to_live.sh can fail. - command = 'mount -o remount,rw,exec /tmp' - cmd_executer.CrosRunCommand( - command, chromeos_root=options.chromeos_root, machine=options.remote) - - # Check to see if cros flash will work for the remote machine. - CheckForCrosFlash(options.chromeos_root, options.remote, log_level) - - # Disable the annoying chromebook beeps after reboot. - DisableCrosBeeps(options.chromeos_root, options.remote, log_level) - - cros_flash_args = [ - 'cros', 'flash', - '--board=%s' % board, '--clobber-stateful', options.remote - ] - if local_image: - cros_flash_args.append(chroot_image) - else: - cros_flash_args.append(image) - - command = ' '.join(cros_flash_args) - - # Workaround for crosbug.com/35684. - os.chmod(misc.GetChromeOSKeyFile(options.chromeos_root), 0o600) - - if log_level == 'average': - cmd_executer.SetLogLevel('verbose') - retries = 0 - while True: - if log_level == 'quiet': - l.LogOutput('CMD : %s' % command) - ret = cmd_executer.ChrootRunCommand( - options.chromeos_root, command, command_timeout=1800) - if ret == 0 or retries >= 2: - break - retries += 1 - if log_level == 'quiet': - l.LogOutput('Imaging failed. Retry # %d.' % retries) - - if log_level == 'average': - cmd_executer.SetLogLevel(log_level) - - logger.GetLogger().LogFatalIf(ret, 'Image command failed') - - # Unfortunately cros_image_to_target.py sometimes returns early when the - # machine isn't fully up yet. - ret = EnsureMachineUp(options.chromeos_root, options.remote, log_level) - - # If this is a non-local image, then the ret returned from - # EnsureMachineUp is the one that will be returned by this function; - # in that case, make sure the value in 'ret' is appropriate. - if not local_image and ret: - ret = 0 - else: - ret = 1 - - if local_image: - if log_level == 'average': - l.LogOutput('Verifying image.') - command = 'echo %s > %s && chmod -w %s' % (image_checksum, - checksum_file, checksum_file) - ret = cmd_executer.CrosRunCommand( - command, - chromeos_root=options.chromeos_root, - machine=options.remote) - logger.GetLogger().LogFatalIf(ret, 'Writing checksum failed.') - - successfully_imaged = VerifyChromeChecksum( - options.chromeos_root, chroot_image, options.remote, log_level) - logger.GetLogger().LogFatalIf(not successfully_imaged, - 'Image verification failed!') - TryRemountPartitionAsRW(options.chromeos_root, options.remote, - log_level) - - if not found: - temp_dir = os.path.dirname(located_image) - l.LogOutput('Deleting temp image dir: %s' % temp_dir) - shutil.rmtree(temp_dir) - l.LogOutput('Image updated.') + board = options.board + + if options.image is None: + images_dir = misc.GetImageDir(options.chromeos_root, board) + image = os.path.join(images_dir, "latest", "chromiumos_test_image.bin") + if not os.path.exists(image): + image = os.path.join(images_dir, "latest", "chromiumos_image.bin") + is_xbuddy_image = False else: - l.LogOutput('Checksums match, skip image update and reboot.') - command = 'reboot && exit' - _ = cmd_executer.CrosRunCommand( - command, chromeos_root=options.chromeos_root, machine=options.remote) - # Wait 30s after reboot. - time.sleep(30) - - finally: - if should_unlock: - locks.ReleaseLock(list(options.remote.split()), options.chromeos_root) + image = options.image + is_xbuddy_image = image.startswith("xbuddy://") + if not is_xbuddy_image: + image = os.path.expanduser(image) - return ret + if not is_xbuddy_image: + image = os.path.realpath(image) + + if not os.path.exists(image) and not is_xbuddy_image: + Usage(parser, "Image file: " + image + " does not exist!") + + try: + should_unlock = False + if not options.no_lock: + try: + _ = locks.AcquireLock( + list(options.remote.split()), options.chromeos_root + ) + should_unlock = True + except Exception as e: + raise RuntimeError("Error acquiring machine: %s" % str(e)) + + reimage = False + local_image = False + if not is_xbuddy_image: + local_image = True + image_checksum = FileUtils().Md5File(image, log_level=log_level) + + command = "cat " + checksum_file + ret, device_checksum, _ = cmd_executer.CrosRunCommandWOutput( + command, + chromeos_root=options.chromeos_root, + machine=options.remote, + ) + + device_checksum = device_checksum.strip() + image_checksum = str(image_checksum) + + l.LogOutput("Image checksum: " + image_checksum) + l.LogOutput("Device checksum: " + device_checksum) + + if image_checksum != device_checksum: + [found, located_image] = LocateOrCopyImage( + options.chromeos_root, image, board=board + ) + + reimage = True + l.LogOutput("Checksums do not match. Re-imaging...") + + chroot_image = FindChromeOSImage( + located_image, options.chromeos_root + ) + + is_test_image = IsImageModdedForTest( + options.chromeos_root, chroot_image, log_level + ) + + if not is_test_image and not options.force: + logger.GetLogger().LogFatal( + "Have to pass --force to image a " "non-test image!" + ) + else: + reimage = True + found = True + l.LogOutput("Using non-local image; Re-imaging...") + + if reimage: + # If the device has /tmp mounted as noexec, image_to_live.sh can fail. + command = "mount -o remount,rw,exec /tmp" + cmd_executer.CrosRunCommand( + command, + chromeos_root=options.chromeos_root, + machine=options.remote, + ) + + # Check to see if cros flash will work for the remote machine. + CheckForCrosFlash(options.chromeos_root, options.remote, log_level) + + # Disable the annoying chromebook beeps after reboot. + DisableCrosBeeps(options.chromeos_root, options.remote, log_level) + + cros_flash_args = [ + "cros", + "flash", + "--board=%s" % board, + "--clobber-stateful", + options.remote, + ] + if local_image: + cros_flash_args.append(chroot_image) + else: + cros_flash_args.append(image) + + command = " ".join(cros_flash_args) + + # Workaround for crosbug.com/35684. + os.chmod(misc.GetChromeOSKeyFile(options.chromeos_root), 0o600) + + if log_level == "average": + cmd_executer.SetLogLevel("verbose") + retries = 0 + while True: + if log_level == "quiet": + l.LogOutput("CMD : %s" % command) + ret = cmd_executer.ChrootRunCommand( + options.chromeos_root, command, command_timeout=1800 + ) + if ret == 0 or retries >= 2: + break + retries += 1 + if log_level == "quiet": + l.LogOutput("Imaging failed. Retry # %d." % retries) + + if log_level == "average": + cmd_executer.SetLogLevel(log_level) + + logger.GetLogger().LogFatalIf(ret, "Image command failed") + + # Unfortunately cros_image_to_target.py sometimes returns early when the + # machine isn't fully up yet. + ret = EnsureMachineUp( + options.chromeos_root, options.remote, log_level + ) + + # If this is a non-local image, then the ret returned from + # EnsureMachineUp is the one that will be returned by this function; + # in that case, make sure the value in 'ret' is appropriate. + if not local_image and ret: + ret = 0 + else: + ret = 1 + + if local_image: + if log_level == "average": + l.LogOutput("Verifying image.") + command = "echo %s > %s && chmod -w %s" % ( + image_checksum, + checksum_file, + checksum_file, + ) + ret = cmd_executer.CrosRunCommand( + command, + chromeos_root=options.chromeos_root, + machine=options.remote, + ) + logger.GetLogger().LogFatalIf(ret, "Writing checksum failed.") + + successfully_imaged = VerifyChromeChecksum( + options.chromeos_root, + chroot_image, + options.remote, + log_level, + ) + logger.GetLogger().LogFatalIf( + not successfully_imaged, "Image verification failed!" + ) + TryRemountPartitionAsRW( + options.chromeos_root, options.remote, log_level + ) + + if not found: + temp_dir = os.path.dirname(located_image) + l.LogOutput("Deleting temp image dir: %s" % temp_dir) + shutil.rmtree(temp_dir) + l.LogOutput("Image updated.") + else: + l.LogOutput("Checksums match, skip image update and reboot.") + command = "reboot && exit" + _ = cmd_executer.CrosRunCommand( + command, + chromeos_root=options.chromeos_root, + machine=options.remote, + ) + # Wait 30s after reboot. + time.sleep(30) + + finally: + if should_unlock: + locks.ReleaseLock( + list(options.remote.split()), options.chromeos_root + ) + + return ret def LocateOrCopyImage(chromeos_root, image, board=None): - l = logger.GetLogger() - if board is None: - board_glob = '*' - else: - board_glob = board - - chromeos_root_realpath = os.path.realpath(chromeos_root) - image = os.path.realpath(image) - - if image.startswith('%s/' % chromeos_root_realpath): - return [True, image] - - # First search within the existing build dirs for any matching files. - images_glob = ( - '%s/src/build/images/%s/*/*.bin' % (chromeos_root_realpath, board_glob)) - images_list = glob.glob(images_glob) - for potential_image in images_list: - if filecmp.cmp(potential_image, image): - l.LogOutput('Found matching image %s in chromeos_root.' % potential_image) - return [True, potential_image] - # We did not find an image. Copy it in the src dir and return the copied - # file. - if board is None: - board = '' - base_dir = ('%s/src/build/images/%s' % (chromeos_root_realpath, board)) - if not os.path.isdir(base_dir): - os.makedirs(base_dir) - temp_dir = tempfile.mkdtemp(prefix='%s/tmp' % base_dir) - new_image = '%s/%s' % (temp_dir, os.path.basename(image)) - l.LogOutput('No matching image found. Copying %s to %s' % (image, new_image)) - shutil.copyfile(image, new_image) - return [False, new_image] + l = logger.GetLogger() + if board is None: + board_glob = "*" + else: + board_glob = board + + chromeos_root_realpath = os.path.realpath(chromeos_root) + image = os.path.realpath(image) + + if image.startswith("%s/" % chromeos_root_realpath): + return [True, image] + + # First search within the existing build dirs for any matching files. + images_glob = "%s/src/build/images/%s/*/*.bin" % ( + chromeos_root_realpath, + board_glob, + ) + images_list = glob.glob(images_glob) + for potential_image in images_list: + if filecmp.cmp(potential_image, image): + l.LogOutput( + "Found matching image %s in chromeos_root." % potential_image + ) + return [True, potential_image] + # We did not find an image. Copy it in the src dir and return the copied + # file. + if board is None: + board = "" + base_dir = "%s/src/build/images/%s" % (chromeos_root_realpath, board) + if not os.path.isdir(base_dir): + os.makedirs(base_dir) + temp_dir = tempfile.mkdtemp(prefix="%s/tmp" % base_dir) + new_image = "%s/%s" % (temp_dir, os.path.basename(image)) + l.LogOutput( + "No matching image found. Copying %s to %s" % (image, new_image) + ) + shutil.copyfile(image, new_image) + return [False, new_image] def GetImageMountCommand(image, rootfs_mp, stateful_mp): - image_dir = os.path.dirname(image) - image_file = os.path.basename(image) - mount_command = ('cd /mnt/host/source/src/scripts &&' - './mount_gpt_image.sh --from=%s --image=%s' - ' --safe --read_only' - ' --rootfs_mountpt=%s' - ' --stateful_mountpt=%s' % (image_dir, image_file, rootfs_mp, - stateful_mp)) - return mount_command - - -def MountImage(chromeos_root, - image, - rootfs_mp, - stateful_mp, - log_level, - unmount=False, - extra_commands=''): - cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) - command = GetImageMountCommand(image, rootfs_mp, stateful_mp) - if unmount: - command = '%s --unmount' % command - if extra_commands: - command = '%s ; %s' % (command, extra_commands) - ret, out, _ = cmd_executer.ChrootRunCommandWOutput(chromeos_root, command) - logger.GetLogger().LogFatalIf(ret, 'Mount/unmount command failed!') - return out + image_dir = os.path.dirname(image) + image_file = os.path.basename(image) + mount_command = ( + "cd /mnt/host/source/src/scripts &&" + "./mount_gpt_image.sh --from=%s --image=%s" + " --safe --read_only" + " --rootfs_mountpt=%s" + " --stateful_mountpt=%s" + % (image_dir, image_file, rootfs_mp, stateful_mp) + ) + return mount_command + + +def MountImage( + chromeos_root, + image, + rootfs_mp, + stateful_mp, + log_level, + unmount=False, + extra_commands="", +): + cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) + command = GetImageMountCommand(image, rootfs_mp, stateful_mp) + if unmount: + command = "%s --unmount" % command + if extra_commands: + command = "%s ; %s" % (command, extra_commands) + ret, out, _ = cmd_executer.ChrootRunCommandWOutput(chromeos_root, command) + logger.GetLogger().LogFatalIf(ret, "Mount/unmount command failed!") + return out def IsImageModdedForTest(chromeos_root, image, log_level): - if log_level != 'verbose': - log_level = 'quiet' - command = 'mktemp -d' - cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) - _, rootfs_mp, _ = cmd_executer.ChrootRunCommandWOutput(chromeos_root, command) - _, stateful_mp, _ = cmd_executer.ChrootRunCommandWOutput( - chromeos_root, command) - rootfs_mp = rootfs_mp.strip() - stateful_mp = stateful_mp.strip() - lsb_release_file = os.path.join(rootfs_mp, 'etc/lsb-release') - extra = ('grep CHROMEOS_RELEASE_TRACK %s | grep -i test' % lsb_release_file) - output = MountImage( - chromeos_root, - image, - rootfs_mp, - stateful_mp, - log_level, - extra_commands=extra) - is_test_image = re.search('test', output, re.IGNORECASE) - MountImage( - chromeos_root, image, rootfs_mp, stateful_mp, log_level, unmount=True) - return is_test_image + if log_level != "verbose": + log_level = "quiet" + command = "mktemp -d" + cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) + _, rootfs_mp, _ = cmd_executer.ChrootRunCommandWOutput( + chromeos_root, command + ) + _, stateful_mp, _ = cmd_executer.ChrootRunCommandWOutput( + chromeos_root, command + ) + rootfs_mp = rootfs_mp.strip() + stateful_mp = stateful_mp.strip() + lsb_release_file = os.path.join(rootfs_mp, "etc/lsb-release") + extra = "grep CHROMEOS_RELEASE_TRACK %s | grep -i test" % lsb_release_file + output = MountImage( + chromeos_root, + image, + rootfs_mp, + stateful_mp, + log_level, + extra_commands=extra, + ) + is_test_image = re.search("test", output, re.IGNORECASE) + MountImage( + chromeos_root, image, rootfs_mp, stateful_mp, log_level, unmount=True + ) + return is_test_image def VerifyChromeChecksum(chromeos_root, image, remote, log_level): - command = 'mktemp -d' - cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) - _, rootfs_mp, _ = cmd_executer.ChrootRunCommandWOutput(chromeos_root, command) - _, stateful_mp, _ = cmd_executer.ChrootRunCommandWOutput( - chromeos_root, command) - rootfs_mp = rootfs_mp.strip() - stateful_mp = stateful_mp.strip() - chrome_file = '%s/opt/google/chrome/chrome' % rootfs_mp - extra = 'md5sum %s' % chrome_file - out = MountImage( - chromeos_root, - image, - rootfs_mp, - stateful_mp, - log_level, - extra_commands=extra) - image_chrome_checksum = out.strip().split()[0] - MountImage( - chromeos_root, image, rootfs_mp, stateful_mp, log_level, unmount=True) - - command = 'md5sum /opt/google/chrome/chrome' - [_, o, _] = cmd_executer.CrosRunCommandWOutput( - command, chromeos_root=chromeos_root, machine=remote) - device_chrome_checksum = o.split()[0] - return image_chrome_checksum.strip() == device_chrome_checksum.strip() + command = "mktemp -d" + cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) + _, rootfs_mp, _ = cmd_executer.ChrootRunCommandWOutput( + chromeos_root, command + ) + _, stateful_mp, _ = cmd_executer.ChrootRunCommandWOutput( + chromeos_root, command + ) + rootfs_mp = rootfs_mp.strip() + stateful_mp = stateful_mp.strip() + chrome_file = "%s/opt/google/chrome/chrome" % rootfs_mp + extra = "md5sum %s" % chrome_file + out = MountImage( + chromeos_root, + image, + rootfs_mp, + stateful_mp, + log_level, + extra_commands=extra, + ) + image_chrome_checksum = out.strip().split()[0] + MountImage( + chromeos_root, image, rootfs_mp, stateful_mp, log_level, unmount=True + ) + + command = "md5sum /opt/google/chrome/chrome" + [_, o, _] = cmd_executer.CrosRunCommandWOutput( + command, chromeos_root=chromeos_root, machine=remote + ) + device_chrome_checksum = o.split()[0] + return image_chrome_checksum.strip() == device_chrome_checksum.strip() # Remount partition as writable. # TODO: auto-detect if an image is built using --noenable_rootfs_verification. def TryRemountPartitionAsRW(chromeos_root, remote, log_level): - l = logger.GetLogger() - cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) - command = 'sudo mount -o remount,rw /' - ret = cmd_executer.CrosRunCommand(\ - command, chromeos_root=chromeos_root, machine=remote, - terminated_timeout=10) - if ret: - ## Safely ignore. - l.LogWarning('Failed to remount partition as rw, ' - 'probably the image was not built with ' - '"--noenable_rootfs_verification", ' - 'you can safely ignore this.') - else: - l.LogOutput('Re-mounted partition as writable.') + l = logger.GetLogger() + cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) + command = "sudo mount -o remount,rw /" + ret = cmd_executer.CrosRunCommand( + command, + chromeos_root=chromeos_root, + machine=remote, + terminated_timeout=10, + ) + if ret: + ## Safely ignore. + l.LogWarning( + "Failed to remount partition as rw, " + "probably the image was not built with " + '"--noenable_rootfs_verification", ' + "you can safely ignore this." + ) + else: + l.LogOutput("Re-mounted partition as writable.") def EnsureMachineUp(chromeos_root, remote, log_level): - l = logger.GetLogger() - cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) - timeout = 600 - magic = 'abcdefghijklmnopqrstuvwxyz' - command = 'echo %s' % magic - start_time = time.time() - while True: - current_time = time.time() - if current_time - start_time > timeout: - l.LogError( - 'Timeout of %ss reached. Machine still not up. Aborting.' % timeout) - return False - ret = cmd_executer.CrosRunCommand( - command, chromeos_root=chromeos_root, machine=remote) - if not ret: - return True + l = logger.GetLogger() + cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) + timeout = 600 + magic = "abcdefghijklmnopqrstuvwxyz" + command = "echo %s" % magic + start_time = time.time() + while True: + current_time = time.time() + if current_time - start_time > timeout: + l.LogError( + "Timeout of %ss reached. Machine still not up. Aborting." + % timeout + ) + return False + ret = cmd_executer.CrosRunCommand( + command, chromeos_root=chromeos_root, machine=remote + ) + if not ret: + return True -if __name__ == '__main__': - retval = DoImage(sys.argv) - sys.exit(retval) +if __name__ == "__main__": + retval = DoImage(sys.argv) + sys.exit(retval) |