aboutsummaryrefslogtreecommitdiff
path: root/cros_utils
diff options
context:
space:
mode:
Diffstat (limited to 'cros_utils')
-rwxr-xr-xcros_utils/bugs.py104
-rwxr-xr-xcros_utils/bugs_test.py124
-rwxr-xr-xcros_utils/command_executer.py142
-rwxr-xr-xcros_utils/email_sender.py16
-rwxr-xr-xcros_utils/email_sender_unittest.py10
-rw-r--r--cros_utils/manifest_versions.py162
-rw-r--r--cros_utils/misc.py109
7 files changed, 342 insertions, 325 deletions
diff --git a/cros_utils/bugs.py b/cros_utils/bugs.py
new file mode 100755
index 00000000..88fb7675
--- /dev/null
+++ b/cros_utils/bugs.py
@@ -0,0 +1,104 @@
+#!/usr/bin/env python3
+# Copyright 2021 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Utilities to file bugs."""
+
+import base64
+import datetime
+import enum
+import json
+import os
+from typing import Any, Dict, List, Optional
+
+X20_PATH = '/google/data/rw/teams/c-compiler-chrome/prod_bugs'
+
+
+class WellKnownComponents(enum.IntEnum):
+ """A listing of "well-known" components recognized by our infra."""
+ CrOSToolchainPublic = -1
+ CrOSToolchainPrivate = -2
+
+
+def _WriteBugJSONFile(object_type: str, json_object: Dict[str, Any]):
+ """Writes a JSON file to X20_PATH with the given bug-ish object."""
+ final_object = {
+ 'type': object_type,
+ 'value': json_object,
+ }
+
+ # The name of this has two parts:
+ # - An easily sortable time, to provide uniqueness and let our service send
+ # things in the order they were put into the outbox.
+ # - 64 bits of entropy, so two racing bug writes don't clobber the same file.
+ now = datetime.datetime.utcnow().isoformat('T', 'seconds') + 'Z'
+ entropy = base64.urlsafe_b64encode(os.getrandom(8))
+ entropy_str = entropy.rstrip(b'=').decode('utf-8')
+ file_path = os.path.join(X20_PATH, f'{now}_{entropy_str}.json')
+
+ temp_path = file_path + '.in_progress'
+ try:
+ with open(temp_path, 'w') as f:
+ json.dump(final_object, f)
+ os.rename(temp_path, file_path)
+ except:
+ os.remove(temp_path)
+ raise
+ return file_path
+
+
+def AppendToExistingBug(bug_id: int, body: str):
+ """Sends a reply to an existing bug."""
+ _WriteBugJSONFile('AppendToExistingBugRequest', {
+ 'body': body,
+ 'bug_id': bug_id,
+ })
+
+
+def CreateNewBug(component_id: int,
+ title: str,
+ body: str,
+ assignee: Optional[str] = None,
+ cc: Optional[List[str]] = None):
+ """Sends a request to create a new bug.
+
+ Args:
+ component_id: The component ID to add. Anything from WellKnownComponents
+ also works.
+ title: Title of the bug. Must be nonempty.
+ body: Body of the bug. Must be nonempty.
+ assignee: Assignee of the bug. Must be either an email address, or a
+ "well-known" assignee (detective, mage).
+ cc: A list of emails to add to the CC list. Must either be an email
+ address, or a "well-known" individual (detective, mage).
+ """
+ obj = {
+ 'component_id': component_id,
+ 'subject': title,
+ 'body': body,
+ }
+
+ if assignee:
+ obj['assignee'] = assignee
+
+ if cc:
+ obj['cc'] = cc
+
+ _WriteBugJSONFile('FileNewBugRequest', obj)
+
+
+def SendCronjobLog(cronjob_name: str, failed: bool, message: str):
+ """Sends the record of a cronjob to our bug infra.
+
+ cronjob_name: The name of the cronjob. Expected to remain consistent over
+ time.
+ failed: Whether the job failed or not.
+ message: Any seemingly relevant context. This is pasted verbatim in a bug, if
+ the cronjob infra deems it worthy.
+ """
+ _WriteBugJSONFile('ChrotomationCronjobUpdate', {
+ 'name': cronjob_name,
+ 'message': message,
+ 'failed': failed,
+ })
diff --git a/cros_utils/bugs_test.py b/cros_utils/bugs_test.py
new file mode 100755
index 00000000..03dee64d
--- /dev/null
+++ b/cros_utils/bugs_test.py
@@ -0,0 +1,124 @@
+#!/usr/bin/env python3
+# Copyright 2021 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# We're testing protected methods, so allow protected access.
+# pylint: disable=protected-access
+
+"""Tests bug filing bits."""
+
+import json
+import tempfile
+import unittest
+from unittest.mock import patch
+
+import bugs
+
+
+class Tests(unittest.TestCase):
+ """Tests for the bugs module."""
+ def testWritingJSONFileSeemsToWork(self):
+ """Tests JSON file writing."""
+ old_x20_path = bugs.X20_PATH
+
+ def restore_x20_path():
+ bugs.X20_PATH = old_x20_path
+
+ self.addCleanup(restore_x20_path)
+
+ with tempfile.TemporaryDirectory() as tempdir:
+ bugs.X20_PATH = tempdir
+ file_path = bugs._WriteBugJSONFile(
+ 'ObjectType', {
+ 'foo': 'bar',
+ 'baz': bugs.WellKnownComponents.CrOSToolchainPublic,
+ })
+
+ self.assertTrue(file_path.startswith(tempdir),
+ f'Expected {file_path} to start with {tempdir}')
+
+ with open(file_path) as f:
+ self.assertEqual(
+ json.load(f),
+ {
+ 'type': 'ObjectType',
+ 'value': {
+ 'foo': 'bar',
+ 'baz': int(bugs.WellKnownComponents.CrOSToolchainPublic),
+ },
+ },
+ )
+
+ @patch('bugs._WriteBugJSONFile')
+ def testAppendingToBugsSeemsToWork(self, mock_write_json_file):
+ """Tests AppendToExistingBug."""
+ bugs.AppendToExistingBug(1234, 'hello, world!')
+ mock_write_json_file.assert_called_once_with(
+ 'AppendToExistingBugRequest',
+ {
+ 'body': 'hello, world!',
+ 'bug_id': 1234,
+ },
+ )
+
+ @patch('bugs._WriteBugJSONFile')
+ def testBugCreationSeemsToWork(self, mock_write_json_file):
+ """Tests CreateNewBug."""
+ test_case_additions = (
+ {},
+ {
+ 'component_id': bugs.WellKnownComponents.CrOSToolchainPublic,
+ },
+ {
+ 'assignee': 'foo@gbiv.com',
+ 'cc': ['bar@baz.com'],
+ },
+ )
+
+ for additions in test_case_additions:
+ test_case = {
+ 'component_id': 123,
+ 'title': 'foo',
+ 'body': 'bar',
+ **additions,
+ }
+
+ bugs.CreateNewBug(**test_case)
+
+ expected_output = {
+ 'component_id': test_case['component_id'],
+ 'subject': test_case['title'],
+ 'body': test_case['body'],
+ }
+
+ assignee = test_case.get('assignee')
+ if assignee:
+ expected_output['assignee'] = assignee
+
+ cc = test_case.get('cc')
+ if cc:
+ expected_output['cc'] = cc
+
+ mock_write_json_file.assert_called_once_with(
+ 'FileNewBugRequest',
+ expected_output,
+ )
+ mock_write_json_file.reset_mock()
+
+ @patch('bugs._WriteBugJSONFile')
+ def testCronjobLogSendingSeemsToWork(self, mock_write_json_file):
+ """Tests SendCronjobLog."""
+ bugs.SendCronjobLog('my_name', False, 'hello, world!')
+ mock_write_json_file.assert_called_once_with(
+ 'ChrotomationCronjobUpdate',
+ {
+ 'name': 'my_name',
+ 'message': 'hello, world!',
+ 'failed': False,
+ },
+ )
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/cros_utils/command_executer.py b/cros_utils/command_executer.py
index aeedf3ea..cc0f3372 100755
--- a/cros_utils/command_executer.py
+++ b/cros_utils/command_executer.py
@@ -103,14 +103,13 @@ class CommandExecuter(object):
p = None
try:
# pylint: disable=bad-option-value, subprocess-popen-preexec-fn
- p = subprocess.Popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- shell=True,
- preexec_fn=os.setsid,
- executable='/bin/bash',
- env=env)
+ p = subprocess.Popen(cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ shell=True,
+ preexec_fn=os.setsid,
+ executable='/bin/bash',
+ env=env)
full_stdout = ''
full_stderr = ''
@@ -159,16 +158,17 @@ class CommandExecuter(object):
if p.poll() is not None:
if terminated_time is None:
terminated_time = time.time()
- elif (terminated_timeout is not None and
- time.time() - terminated_time > terminated_timeout):
+ elif (terminated_timeout is not None
+ and time.time() - terminated_time > terminated_timeout):
if self.logger:
self.logger.LogWarning(
'Timeout of %s seconds reached since '
- 'process termination.' % terminated_timeout, print_to_console)
+ 'process termination.' % terminated_timeout,
+ print_to_console)
break
- if (command_timeout is not None and
- time.time() - started_time > command_timeout):
+ if (command_timeout is not None
+ and time.time() - started_time > command_timeout):
os.killpg(os.getpgid(p.pid), signal.SIGTERM)
if self.logger:
self.logger.LogWarning(
@@ -242,9 +242,11 @@ class CommandExecuter(object):
return command
def WriteToTempShFile(self, contents):
- with tempfile.NamedTemporaryFile(
- 'w', encoding='utf-8', delete=False, prefix=os.uname()[1],
- suffix='.sh') as f:
+ with tempfile.NamedTemporaryFile('w',
+ encoding='utf-8',
+ delete=False,
+ prefix=os.uname()[1],
+ suffix='.sh') as f:
f.write('#!/bin/bash\n')
f.write(contents)
f.flush()
@@ -292,16 +294,15 @@ class CommandExecuter(object):
machine, port = machine.split(':')
# Write all commands to a file.
command_file = self.WriteToTempShFile(cmd)
- retval = self.CopyFiles(
- command_file,
- command_file,
- dest_machine=machine,
- dest_port=port,
- command_terminator=command_terminator,
- chromeos_root=chromeos_root,
- dest_cros=True,
- recursive=False,
- print_to_console=print_to_console)
+ retval = self.CopyFiles(command_file,
+ command_file,
+ dest_machine=machine,
+ dest_port=port,
+ command_terminator=command_terminator,
+ chromeos_root=chromeos_root,
+ dest_cros=True,
+ recursive=False,
+ print_to_console=print_to_console)
if retval:
if self.logger:
self.logger.LogError('Could not run remote command on machine.'
@@ -311,13 +312,12 @@ class CommandExecuter(object):
command = self.RemoteAccessInitCommand(chromeos_root, machine, port)
command += '\nremote_sh bash %s' % command_file
command += '\nl_retval=$?; echo "$REMOTE_OUT"; exit $l_retval'
- retval = self.RunCommandGeneric(
- command,
- return_output,
- command_terminator=command_terminator,
- command_timeout=command_timeout,
- terminated_timeout=terminated_timeout,
- print_to_console=print_to_console)
+ retval = self.RunCommandGeneric(command,
+ return_output,
+ command_terminator=command_terminator,
+ command_timeout=command_timeout,
+ terminated_timeout=terminated_timeout,
+ print_to_console=print_to_console)
if return_output:
connect_signature = ('Initiating first contact with remote host\n' +
'Connection OK\n')
@@ -372,13 +372,13 @@ class CommandExecuter(object):
if self.logger:
self.logger.LogCmd(command, print_to_console=print_to_console)
- with tempfile.NamedTemporaryFile(
- 'w',
- encoding='utf-8',
- delete=False,
- dir=os.path.join(chromeos_root, 'src/scripts'),
- suffix='.sh',
- prefix='in_chroot_cmd') as f:
+ with tempfile.NamedTemporaryFile('w',
+ encoding='utf-8',
+ delete=False,
+ dir=os.path.join(chromeos_root,
+ 'src/scripts'),
+ suffix='.sh',
+ prefix='in_chroot_cmd') as f:
f.write('#!/bin/bash\n')
f.write(command)
f.write('\n')
@@ -393,7 +393,11 @@ class CommandExecuter(object):
if return_output:
ret = self.RunCommand(
'cd %s; cros_sdk %s -- true' % (chromeos_root, cros_sdk_options),
- env=env)
+ env=env,
+ # Give this command a long time to execute; it might involve setting
+ # the chroot up, or running fstrim on its image file. Both of these
+ # operations can take well over the timeout default of 10 seconds.
+ terminated_timeout=5 * 60)
if ret:
return (ret, '', '')
@@ -402,14 +406,13 @@ class CommandExecuter(object):
command = ("cd %s; cros_sdk %s -- bash -c '%s/%s'" %
(chromeos_root, cros_sdk_options, CHROMEOS_SCRIPTS_DIR,
os.path.basename(command_file)))
- ret = self.RunCommandGeneric(
- command,
- return_output,
- command_terminator=command_terminator,
- command_timeout=command_timeout,
- terminated_timeout=terminated_timeout,
- print_to_console=print_to_console,
- env=env)
+ ret = self.RunCommandGeneric(command,
+ return_output,
+ command_terminator=command_terminator,
+ command_timeout=command_timeout,
+ terminated_timeout=terminated_timeout,
+ print_to_console=print_to_console,
+ env=env)
os.remove(command_file)
return ret
@@ -445,11 +448,10 @@ class CommandExecuter(object):
username=None,
command_terminator=None):
cmd = ' ;\n'.join(cmdlist)
- return self.RunCommand(
- cmd,
- machine=machine,
- username=username,
- command_terminator=command_terminator)
+ return self.RunCommand(cmd,
+ machine=machine,
+ username=username,
+ command_terminator=command_terminator)
def CopyFiles(self,
src,
@@ -505,12 +507,11 @@ class CommandExecuter(object):
else:
command += rsync_prefix + 'root@%s:%s %s' % (cros_machine, src, dest)
- return self.RunCommand(
- command,
- machine=host_machine,
- username=host_user,
- command_terminator=command_terminator,
- print_to_console=print_to_console)
+ return self.RunCommand(command,
+ machine=host_machine,
+ username=host_user,
+ command_terminator=command_terminator,
+ print_to_console=print_to_console)
if dest_machine == src_machine:
command = 'rsync -a %s %s' % (src, dest)
@@ -519,12 +520,11 @@ class CommandExecuter(object):
src_machine = os.uname()[1]
src_user = getpass.getuser()
command = 'rsync -a %s@%s:%s %s' % (src_user, src_machine, src, dest)
- return self.RunCommand(
- command,
- machine=dest_machine,
- username=dest_user,
- command_terminator=command_terminator,
- print_to_console=print_to_console)
+ return self.RunCommand(command,
+ machine=dest_machine,
+ username=dest_user,
+ command_terminator=command_terminator,
+ print_to_console=print_to_console)
def RunCommand2(self,
cmd,
@@ -593,8 +593,9 @@ class CommandExecuter(object):
def notify_line(self):
p = self._buf.find('\n')
while p >= 0:
- self._line_consumer(
- line=self._buf[:p + 1], output=self._name, pobject=self._pobject)
+ self._line_consumer(line=self._buf[:p + 1],
+ output=self._name,
+ pobject=self._pobject)
if p < len(self._buf) - 1:
self._buf = self._buf[p + 1:]
p = self._buf.find('\n')
@@ -606,8 +607,9 @@ class CommandExecuter(object):
def notify_eos(self):
# Notify end of stream. The last line may not end with a '\n'.
if self._buf != '':
- self._line_consumer(
- line=self._buf, output=self._name, pobject=self._pobject)
+ self._line_consumer(line=self._buf,
+ output=self._name,
+ pobject=self._pobject)
self._buf = ''
if self.log_level == 'verbose':
diff --git a/cros_utils/email_sender.py b/cros_utils/email_sender.py
index 6b8893ea..df8afbc4 100755
--- a/cros_utils/email_sender.py
+++ b/cros_utils/email_sender.py
@@ -71,7 +71,7 @@ class EmailSender(object):
"From" email address. Must be nonempty.
well_known_recipients: a list of well-known recipients for the email.
These are translated into addresses by our mailer.
- Current potential values for this are ('sheriff',
+ Current potential values for this are ('detective',
'cwp-team', 'cros-team', 'mage'). Either this or
direct_recipients must be a nonempty list.
direct_recipients: @google.com emails to send addresses to. Either this
@@ -89,8 +89,8 @@ class EmailSender(object):
type(well_known_recipients))
if not isinstance(direct_recipients, (tuple, list)):
- raise ValueError(
- '`direct_recipients` is unexpectedly a %s' % type(direct_recipients))
+ raise ValueError('`direct_recipients` is unexpectedly a %s' %
+ type(direct_recipients))
if not subject or not identifier:
raise ValueError('both `subject` and `identifier` must be nonempty')
@@ -205,8 +205,8 @@ class EmailSender(object):
to_be_deleted = []
try:
- with tempfile.NamedTemporaryFile(
- 'w', encoding='utf-8', delete=False) as f:
+ with tempfile.NamedTemporaryFile('w', encoding='utf-8',
+ delete=False) as f:
f.write(text_to_send)
f.flush()
to_be_deleted.append(f.name)
@@ -239,8 +239,10 @@ class EmailSender(object):
report_suffix = '_report.html'
else:
report_suffix = '_report.txt'
- with tempfile.NamedTemporaryFile(
- 'w', encoding='utf-8', delete=False, suffix=report_suffix) as f:
+ with tempfile.NamedTemporaryFile('w',
+ encoding='utf-8',
+ delete=False,
+ suffix=report_suffix) as f:
f.write(attachment.content)
f.flush()
attachment_files.append(f.name)
diff --git a/cros_utils/email_sender_unittest.py b/cros_utils/email_sender_unittest.py
index 73492196..ae41f143 100755
--- a/cros_utils/email_sender_unittest.py
+++ b/cros_utils/email_sender_unittest.py
@@ -69,7 +69,7 @@ class Test(unittest.TestCase):
'subject': 'foo',
'identifier': 'foo',
# non-list recipients
- 'well_known_recipients': 'sheriff',
+ 'well_known_recipients': 'detective',
'text_body': 'hi',
},
]
@@ -89,8 +89,8 @@ class Test(unittest.TestCase):
def actual_write_file(file_path):
nonlocal written_obj
- self.assertTrue(
- file_path.startswith(email_sender.X20_PATH + '/'), file_path)
+ self.assertTrue(file_path.startswith(email_sender.X20_PATH + '/'),
+ file_path)
f = io.StringIO()
yield f
written_obj = json.loads(f.getvalue())
@@ -99,7 +99,7 @@ class Test(unittest.TestCase):
email_sender.EmailSender().SendX20Email(
subject='hello',
identifier='world',
- well_known_recipients=['sheriff'],
+ well_known_recipients=['detective'],
direct_recipients=['gbiv@google.com'],
text_body='text',
html_body='html',
@@ -109,7 +109,7 @@ class Test(unittest.TestCase):
written_obj, {
'subject': 'hello',
'email_identifier': 'world',
- 'well_known_recipients': ['sheriff'],
+ 'well_known_recipients': ['detective'],
'direct_recipients': ['gbiv@google.com'],
'body': 'text',
'html_body': 'html',
diff --git a/cros_utils/manifest_versions.py b/cros_utils/manifest_versions.py
deleted file mode 100644
index 4838de3c..00000000
--- a/cros_utils/manifest_versions.py
+++ /dev/null
@@ -1,162 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Tools for searching/manipulating the manifests repository."""
-
-from __future__ import print_function
-
-__author__ = 'llozano@google.com (Luis Lozano)'
-
-import copy
-import os
-import re
-import shutil
-import tempfile
-import time
-
-from cros_utils import command_executer
-from cros_utils import logger
-
-MANIFEST_VERSION_MAIN_BRANCH = 'master'
-
-
-def IsCrosVersion(version):
- match = re.search(r'(\d+\.\d+\.\d+\.\d+)', version)
- return match is not None
-
-
-def IsRFormatCrosVersion(version):
- match = re.search(r'(R\d+-\d+\.\d+\.\d+)', version)
- return match is not None
-
-
-def RFormatCrosVersion(version):
- assert IsCrosVersion(version)
- tmp_major, tmp_minor = version.split('.', 1)
- rformat = 'R' + tmp_major + '-' + tmp_minor
- assert IsRFormatCrosVersion(rformat)
- return rformat
-
-
-class ManifestVersions(object):
- """This class handles interactions with the manifests repo."""
-
- def __init__(self, internal=True):
- self.internal = internal
- self.clone_location = tempfile.mkdtemp()
- self.ce = command_executer.GetCommandExecuter()
- if internal:
- versions_git = ('https://chrome-internal.googlesource.com/'
- 'chromeos/manifest-versions.git')
- else:
- versions_git = (
- 'https://chromium.googlesource.com/chromiumos/manifest-versions.git')
- commands = [
- 'cd {0}'.format(self.clone_location),
- 'git clone {0}'.format(versions_git)
- ]
- ret = self.ce.RunCommands(commands)
- if ret:
- logger.GetLogger().LogFatal('Failed to clone manifest-versions.')
-
- def __del__(self):
- if self.clone_location:
- shutil.rmtree(self.clone_location)
-
- def TimeToVersionChromeOS(self, my_time):
- """Convert timestamp to version number, in ChromeOS/Paladin."""
- cur_time = time.mktime(time.gmtime())
- des_time = float(my_time)
- if cur_time - des_time > 7000000:
- logger.GetLogger().LogFatal('The time you specify is too early.')
- commands = [
- 'cd {0}'.format(self.clone_location), 'cd manifest-versions',
- 'git checkout -f $(git rev-list' +
- ' --max-count=1 --before={0} origin/{1})'.format(
- my_time, MANIFEST_VERSION_MAIN_BRANCH)
- ]
- ret = self.ce.RunCommands(commands)
- if ret:
- logger.GetLogger().LogFatal('Failed to checkout manifest at '
- 'specified time')
- path = os.path.realpath('{0}/manifest-versions/LKGM/lkgm.xml'.format(
- self.clone_location))
- pp = path.split('/')
- new_list = copy.deepcopy(pp)
- for i, e in enumerate(pp):
- if e == 'android-LKGM-candidates':
- new_list[i] = 'paladin'
- chrome_path = '/'.join(new_list)
- if not os.path.exists(chrome_path):
- logger.GetLogger().LogOutput('LKGM path is %s' % path)
- logger.GetLogger().LogOutput('Cannot find path %s' % chrome_path)
- pieces = os.path.basename(chrome_path).split('.')
- pieces = pieces[:-2]
- new_base = '.'.join(pieces) + '*'
- wild_path = os.path.join('/', '/'.join(new_list[:-1]), new_base)
- command = 'ls %s' % wild_path
- ret, out, _ = self.ce.RunCommandWOutput(command)
- if ret == 0:
- out = out.strip()
- files = out.split('\n')
- latest = files[-1]
- small = os.path.basename(latest).split('.xml')[0]
- version = pp[-2] + '.' + small
- else:
- small = os.path.basename(path).split('.xml')[0]
- version = pp[-2] + '.' + small
- commands = [
- 'cd {0}'.format(self.clone_location), 'cd manifest-versions',
- 'git checkout {0}'.format(MANIFEST_VERSION_MAIN_BRANCH)
- ]
- self.ce.RunCommands(commands)
- return version
-
- def TimeToVersion(self, my_time):
- """Convert timestamp to version number."""
- cur_time = time.mktime(time.gmtime())
- des_time = float(my_time)
- if cur_time - des_time > 7000000:
- logger.GetLogger().LogFatal('The time you specify is too early.')
- commands = [
- 'cd {0}'.format(self.clone_location), 'cd manifest-versions',
- 'git checkout -f $(git rev-list' +
- ' --max-count=1 --before={0} origin/{1})'.format(
- my_time, MANIFEST_VERSION_MAIN_BRANCH)
- ]
- ret = self.ce.RunCommands(commands)
- if ret:
- logger.GetLogger().LogFatal('Failed to checkout manifest at '
- 'specified time')
- path = os.path.realpath('{0}/manifest-versions/LKGM/lkgm.xml'.format(
- self.clone_location))
- pp = path.split('/')
- small = os.path.basename(path).split('.xml')[0]
- version = pp[-2] + '.' + small
- commands = [
- 'cd {0}'.format(self.clone_location), 'cd manifest-versions',
- 'git checkout {0}'.format(MANIFEST_VERSION_MAIN_BRANCH)
- ]
- self.ce.RunCommands(commands)
- return version
-
- def GetManifest(self, version, to_file):
- """Get the manifest file from a given chromeos-internal version."""
- assert not IsRFormatCrosVersion(version)
- version = version.split('.', 1)[1]
- os.chdir(self.clone_location)
- files = [
- os.path.join(r, f)
- for r, _, fs in os.walk('.')
- for f in fs
- if version in f
- ]
- if files:
- command = 'cp {0} {1}'.format(files[0], to_file)
- ret = self.ce.RunCommand(command)
- if ret:
- raise RuntimeError('Cannot copy manifest to {0}'.format(to_file))
- else:
- raise RuntimeError('Version {0} is not available.'.format(version))
diff --git a/cros_utils/misc.py b/cros_utils/misc.py
index 93d1b3b6..a0d0de73 100644
--- a/cros_utils/misc.py
+++ b/cros_utils/misc.py
@@ -15,7 +15,6 @@ import os
import re
import shutil
import sys
-import traceback
from cros_utils import command_executer
from cros_utils import logger
@@ -24,8 +23,6 @@ CHROMEOS_SCRIPTS_DIR = '/mnt/host/source/src/scripts'
TOOLCHAIN_UTILS_PATH = ('/mnt/host/source/src/third_party/toolchain-utils/'
'cros_utils/toolchain_utils.sh')
-CROS_MAIN_BRANCH = 'cros/master'
-
def GetChromeOSVersionFromLSBVersion(lsb_version):
"""Get Chromeos version from Lsb version."""
@@ -112,8 +109,8 @@ def FormatQuotedCommand(command):
def FormatCommands(commands):
- return ApplySubs(
- str(commands), ('&&', '&&\n'), (';', ';\n'), (r'\n+\s*', '\n'))
+ return ApplySubs(str(commands), ('&&', '&&\n'), (';', ';\n'),
+ (r'\n+\s*', '\n'))
def GetImageDir(chromeos_root, board):
@@ -155,16 +152,16 @@ def GetBuildPackagesCommand(board, usepkg=False, debug=False):
withdebug_flag = '--nowithdebug'
return ('%s/build_packages %s --withdev --withtest --withautotest '
'--skip_toolchain_update %s --board=%s '
- '--accept_licenses=@CHROMEOS' % (CHROMEOS_SCRIPTS_DIR, usepkg_flag,
- withdebug_flag, board))
+ '--accept_licenses=@CHROMEOS' %
+ (CHROMEOS_SCRIPTS_DIR, usepkg_flag, withdebug_flag, board))
def GetBuildImageCommand(board, dev=False):
dev_args = ''
if dev:
dev_args = '--noenable_rootfs_verification --disk_layout=2gb-rootfs'
- return ('%s/build_image --board=%s %s test' % (CHROMEOS_SCRIPTS_DIR, board,
- dev_args))
+ return ('%s/build_image --board=%s %s test' %
+ (CHROMEOS_SCRIPTS_DIR, board, dev_args))
def GetSetupBoardCommand(board, usepkg=None, force=None):
@@ -193,8 +190,8 @@ def CanonicalizePath(path):
def GetCtargetFromBoard(board, chromeos_root):
"""Get Ctarget from board."""
base_board = board.split('_')[0]
- command = ('source %s; get_ctarget_from_board %s' % (TOOLCHAIN_UTILS_PATH,
- base_board))
+ command = ('source %s; get_ctarget_from_board %s' %
+ (TOOLCHAIN_UTILS_PATH, base_board))
ce = command_executer.GetCommandExecuter()
ret, out, _ = ce.ChrootRunCommandWOutput(chromeos_root, command)
if ret != 0:
@@ -207,8 +204,8 @@ def GetCtargetFromBoard(board, chromeos_root):
def GetArchFromBoard(board, chromeos_root):
"""Get Arch from board."""
base_board = board.split('_')[0]
- command = (
- 'source %s; get_board_arch %s' % (TOOLCHAIN_UTILS_PATH, base_board))
+ command = ('source %s; get_board_arch %s' %
+ (TOOLCHAIN_UTILS_PATH, base_board))
ce = command_executer.GetCommandExecuter()
ret, out, _ = ce.ChrootRunCommandWOutput(chromeos_root, command)
if ret != 0:
@@ -316,23 +313,22 @@ def WorkingDirectory(new_dir):
def HasGitStagedChanges(git_dir):
"""Return True if git repository has staged changes."""
- command = 'cd {0} && git diff --quiet --cached --exit-code HEAD'.format(
- git_dir)
+ command = f'cd {git_dir} && git diff --quiet --cached --exit-code HEAD'
return command_executer.GetCommandExecuter().RunCommand(
command, print_to_console=False)
def HasGitUnstagedChanges(git_dir):
"""Return True if git repository has un-staged changes."""
- command = 'cd {0} && git diff --quiet --exit-code HEAD'.format(git_dir)
+ command = f'cd {git_dir} && git diff --quiet --exit-code HEAD'
return command_executer.GetCommandExecuter().RunCommand(
command, print_to_console=False)
def HasGitUntrackedChanges(git_dir):
"""Return True if git repository has un-tracked changes."""
- command = ('cd {0} && test -z '
- '$(git ls-files --exclude-standard --others)').format(git_dir)
+ command = (f'cd {git_dir} && test -z '
+ '$(git ls-files --exclude-standard --others)')
return command_executer.GetCommandExecuter().RunCommand(
command, print_to_console=False)
@@ -352,8 +348,8 @@ def GitGetCommitHash(git_dir, commit_symbolic_name):
The git hash for the symbolic name or None if fails.
"""
- command = ('cd {0} && git log -n 1 --pretty="format:%H" {1}').format(
- git_dir, commit_symbolic_name)
+ command = (f'cd {git_dir} && git log -n 1'
+ f' --pretty="format:%H" {commit_symbolic_name}')
rv, out, _ = command_executer.GetCommandExecuter().RunCommandWOutput(
command, print_to_console=False)
if rv == 0:
@@ -393,7 +389,7 @@ def GetGitChangesAsList(git_dir, path=None, staged=False):
Returns:
A list containing all the changed files.
"""
- command = 'cd {0} && git diff --name-only'.format(git_dir)
+ command = f'cd {git_dir} && git diff --name-only'
if staged:
command += ' --cached'
if path:
@@ -408,8 +404,8 @@ def GetGitChangesAsList(git_dir, path=None, staged=False):
def IsChromeOsTree(chromeos_root):
return (os.path.isdir(
- os.path.join(chromeos_root, 'src/third_party/chromiumos-overlay')) and
- os.path.isdir(os.path.join(chromeos_root, 'manifest')))
+ os.path.join(chromeos_root, 'src/third_party/chromiumos-overlay'))
+ and os.path.isdir(os.path.join(chromeos_root, 'manifest')))
def DeleteChromeOsTree(chromeos_root, dry_run=False):
@@ -423,11 +419,10 @@ def DeleteChromeOsTree(chromeos_root, dry_run=False):
True if everything is ok.
"""
if not IsChromeOsTree(chromeos_root):
- logger.GetLogger().LogWarning(
- '"{0}" does not seem to be a valid chromeos tree, do nothing.'.format(
- chromeos_root))
+ logger.GetLogger().LogWarning(f'"{chromeos_root}" does not seem to be a'
+ ' valid chromeos tree, do nothing.')
return False
- cmd0 = 'cd {0} && cros_sdk --delete'.format(chromeos_root)
+ cmd0 = f'cd {chromeos_root} && cros_sdk --delete'
if dry_run:
print(cmd0)
else:
@@ -435,10 +430,10 @@ def DeleteChromeOsTree(chromeos_root, dry_run=False):
cmd0, print_to_console=True) != 0:
return False
- cmd1 = ('export CHROMEOSDIRNAME="$(dirname $(cd {0} && pwd))" && '
- 'export CHROMEOSBASENAME="$(basename $(cd {0} && pwd))" && '
- 'cd $CHROMEOSDIRNAME && sudo rm -fr $CHROMEOSBASENAME'
- ).format(chromeos_root)
+ cmd1 = (
+ f'export CHROMEOSDIRNAME="$(dirname $(cd {chromeos_root} && pwd))" && '
+ f'export CHROMEOSBASENAME="$(basename $(cd {chromeos_root} && pwd))" && '
+ 'cd $CHROMEOSDIRNAME && sudo rm -fr $CHROMEOSBASENAME')
if dry_run:
print(cmd1)
return True
@@ -447,54 +442,6 @@ def DeleteChromeOsTree(chromeos_root, dry_run=False):
cmd1, print_to_console=True) == 0
-def ApplyGerritPatches(chromeos_root,
- gerrit_patch_string,
- branch=CROS_MAIN_BRANCH):
- """Apply gerrit patches on a chromeos tree.
-
- Args:
- chromeos_root: chromeos tree path
- gerrit_patch_string: a patch string just like the one gives to cbuildbot,
- 'id1 id2 *id3 ... idn'. A prefix of '* means this is an internal patch.
- branch: the tree based on which to apply the patches.
-
- Returns:
- True if success.
- """
-
- ### First of all, we need chromite libs
- sys.path.append(os.path.join(chromeos_root, 'chromite'))
- # Imports below are ok after modifying path to add chromite.
- # Pylint cannot detect that and complains.
- # pylint: disable=import-error, import-outside-toplevel
- from lib import git
- from lib import gerrit
- manifest = git.ManifestCheckout(chromeos_root)
- patch_list = gerrit_patch_string.split(' ')
- ### This takes time, print log information.
- logger.GetLogger().LogOutput('Retrieving patch information from server ...')
- patch_info_list = gerrit.GetGerritPatchInfo(patch_list)
- for pi in patch_info_list:
- project_checkout = manifest.FindCheckout(pi.project, strict=False)
- if not project_checkout:
- logger.GetLogger().LogError(
- 'Failed to find patch project "{project}" in manifest.'.format(
- project=pi.project))
- return False
-
- pi_str = '{project}:{ref}'.format(project=pi.project, ref=pi.ref)
- try:
- project_git_path = project_checkout.GetPath(absolute=True)
- logger.GetLogger().LogOutput('Applying patch "{0}" in "{1}" ...'.format(
- pi_str, project_git_path))
- pi.Apply(project_git_path, branch, trivial=False)
- except Exception:
- traceback.print_exc(file=sys.stdout)
- logger.GetLogger().LogError('Failed to apply patch "{0}"'.format(pi_str))
- return False
- return True
-
-
def BooleanPrompt(prompt='Do you want to continue?',
default=True,
true_value='yes',
@@ -515,8 +462,8 @@ def BooleanPrompt(prompt='Do you want to continue?',
true_value, false_value = true_value.lower(), false_value.lower()
true_text, false_text = true_value, false_value
if true_value == false_value:
- raise ValueError(
- 'true_value and false_value must differ: got %r' % true_value)
+ raise ValueError('true_value and false_value must differ: got %r' %
+ true_value)
if default:
true_text = true_text[0].upper() + true_text[1:]