summaryrefslogtreecommitdiff
path: root/scripts/parallel_emerge.py
diff options
context:
space:
mode:
authorThiago Goncales <thiagog@chromium.org>2013-07-17 10:26:35 -0700
committerChromeBot <chrome-bot@google.com>2013-08-09 14:02:50 -0700
commitf4acc429f96224bbe697e2edf137f7e030e7f240 (patch)
treeb3bb50933ed244a0c2d81617e0b85da81afe65fa /scripts/parallel_emerge.py
parenta81d523bb5e29b02716075abc42e9e2a57d8d2e0 (diff)
downloadchromite-f4acc429f96224bbe697e2edf137f7e030e7f240.tar.gz
parallel_emerge: add --unpackonly.
The --unpackonly flag makes parallel-emerge fetch and then unpack all packages, but not emerge them. This was accomplished by creating an unpack queue, which, if --unpackonly is set, is filled as the fetch queue is completed. If we desire to work with the binary packages only of a project, the workflow is to run --fetchonly and then extracting each package. By adding --unpackonly, we can parallelize this with fetchonly and make the entire workflow faster. BUG=chromium:264240 TEST=Compared sysroot using --unpackonly with using --fetchonly and then unpacking all packages. Change-Id: I3d9acb7032f036b786e3cba9843b7720471eecdb Signed-off-by: Thiago Goncales <thiagog@chromium.org> Reviewed-on: https://gerrit.chromium.org/gerrit/62856 Reviewed-by: David James <davidjames@chromium.org>
Diffstat (limited to 'scripts/parallel_emerge.py')
-rw-r--r--scripts/parallel_emerge.py143
1 files changed, 125 insertions, 18 deletions
diff --git a/scripts/parallel_emerge.py b/scripts/parallel_emerge.py
index 5e5833b00..f536b1892 100644
--- a/scripts/parallel_emerge.py
+++ b/scripts/parallel_emerge.py
@@ -29,6 +29,8 @@ import threading
import time
import traceback
+from chromite.lib import cros_build_lib
+
# If PORTAGE_USERNAME isn't specified, scrape it from the $HOME variable. On
# Chromium OS, the default "portage" user doesn't have the necessary
# permissions. It'd be easier if we could default to $USERNAME, but $USERNAME
@@ -203,13 +205,14 @@ class DepGraphGenerator(object):
PrintDepsMap(deps_graph)
"""
- __slots__ = ["board", "emerge", "package_db", "show_output"]
+ __slots__ = ["board", "emerge", "package_db", "show_output", "unpack_only"]
def __init__(self):
self.board = None
self.emerge = EmergeData()
self.package_db = {}
self.show_output = False
+ self.unpack_only = False
def ParseParallelEmergeArgs(self, argv):
"""Read the parallel emerge arguments from the command-line.
@@ -239,6 +242,9 @@ class DepGraphGenerator(object):
self.show_output = True
elif arg == "--rebuild":
emerge_args.append("--rebuild-if-unbuilt")
+ elif arg == "--unpackonly":
+ emerge_args.append("--fetchonly")
+ self.unpack_only = True
else:
# Not one of our options, so pass through to emerge.
emerge_args.append(arg)
@@ -792,10 +798,10 @@ def PrintDepsMap(deps_map):
class EmergeJobState(object):
__slots__ = ["done", "filename", "last_notify_timestamp", "last_output_seek",
"last_output_timestamp", "pkgname", "retcode", "start_timestamp",
- "target", "fetch_only"]
+ "target", "fetch_only", "unpack_only"]
def __init__(self, target, pkgname, done, filename, start_timestamp,
- retcode=None, fetch_only=False):
+ retcode=None, fetch_only=False, unpack_only=False):
# The full name of the target we're building (e.g.
# chromeos-base/chromeos-0.0.1-r60)
@@ -833,6 +839,9 @@ class EmergeJobState(object):
# The timestamp when our job started.
self.start_timestamp = start_timestamp
+ # No emerge, only unpack packages.
+ self.unpack_only = unpack_only
+
def KillHandler(_signum, _frame):
# Kill self and all subprocesses.
@@ -916,7 +925,42 @@ def EmergeProcess(output, *args, **kwargs):
# Return the exit code of the subprocess.
return os.waitpid(pid, 0)[1]
-def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False):
+
+def UnpackPackage(pkg_state):
+ """Unpacks package described by pkg_state.
+
+ Args:
+ pkg_state: EmergeJobState object describing target.
+
+ Returns:
+ Exit code returned by subprocess.
+ """
+ pkgdir = os.environ.get("PKGDIR",
+ os.path.join(os.environ["SYSROOT"], "packages"))
+ root = os.environ.get("ROOT", os.environ["SYSROOT"])
+ path = os.path.join(pkgdir, pkg_state.target + ".tbz2")
+ comp = cros_build_lib.FindCompressor(cros_build_lib.COMP_BZIP2)
+ cmd = [comp, "-dc"]
+ if comp.endswith("pbzip2"):
+ cmd.append("--ignore-trailing-garbage=1")
+ cmd.append(path)
+
+ result = cros_build_lib.RunCommand(cmd, cwd=root, stdout_to_pipe=True,
+ print_cmd=False, error_code_ok=True)
+
+ # If we were not successful, return now and don't attempt untar.
+ if result.returncode:
+ return result.returncode
+
+ cmd = ["sudo", "tar", "-xf", "-", "-C", root]
+ result = cros_build_lib.RunCommand(cmd, cwd=root, input=result.output,
+ print_cmd=False, error_code_ok=True)
+
+ return result.returncode
+
+
+def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False,
+ unpack_only=False):
"""This worker emerges any packages given to it on the task_queue.
Args:
@@ -925,6 +969,7 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False):
emerge: An EmergeData() object.
package_db: A dict, mapping package ids to portage Package objects.
fetch_only: A bool, indicating if we should just fetch the target.
+ unpack_only: A bool, indicating if we should just unpack the target.
It expects package identifiers to be passed to it via task_queue. When
a task is started, it pushes the (target, filename) to the started_queue.
@@ -982,16 +1027,19 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False):
os.chmod(output.name, 644)
start_timestamp = time.time()
job = EmergeJobState(target, pkgname, False, output.name, start_timestamp,
- fetch_only=fetch_only)
+ fetch_only=fetch_only, unpack_only=unpack_only)
job_queue.put(job)
if "--pretend" in opts:
retcode = 0
else:
try:
emerge.scheduler_graph.mergelist = install_list
- retcode = EmergeProcess(output, settings, trees, mtimedb, opts,
- spinner, favorites=emerge.favorites,
- graph_config=emerge.scheduler_graph)
+ if unpack_only:
+ retcode = UnpackPackage(pkg_state)
+ else:
+ retcode = EmergeProcess(output, settings, trees, mtimedb, opts,
+ spinner, favorites=emerge.favorites,
+ graph_config=emerge.scheduler_graph)
except Exception:
traceback.print_exc(file=output)
retcode = 1
@@ -1001,7 +1049,8 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False):
return
job = EmergeJobState(target, pkgname, True, output.name, start_timestamp,
- retcode, fetch_only=fetch_only)
+ retcode, fetch_only=fetch_only,
+ unpack_only=unpack_only)
job_queue.put(job)
@@ -1176,7 +1225,7 @@ class ScoredHeap(object):
class EmergeQueue(object):
"""Class to schedule emerge jobs according to a dependency graph."""
- def __init__(self, deps_map, emerge, package_db, show_output):
+ def __init__(self, deps_map, emerge, package_db, show_output, unpack_only):
# Store the dependency graph.
self._deps_map = deps_map
self._state_map = {}
@@ -1185,10 +1234,13 @@ class EmergeQueue(object):
self._build_ready = ScoredHeap()
self._fetch_jobs = {}
self._fetch_ready = ScoredHeap()
+ self._unpack_jobs = {}
+ self._unpack_ready = ScoredHeap()
# List of total package installs represented in deps_map.
install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"]
self._total_jobs = len(install_jobs)
self._show_output = show_output
+ self._unpack_only = unpack_only
if "--pretend" in emerge.opts:
print "Skipping merge because of --pretend mode."
@@ -1207,7 +1259,7 @@ class EmergeQueue(object):
# jobs.
procs = min(self._total_jobs,
emerge.opts.pop("--jobs", multiprocessing.cpu_count()))
- self._build_procs = self._fetch_procs = max(1, procs)
+ self._build_procs = self._unpack_procs = self._fetch_procs = max(1, procs)
self._load_avg = emerge.opts.pop("--load-average", None)
self._job_queue = multiprocessing.Queue()
self._print_queue = multiprocessing.Queue()
@@ -1222,6 +1274,14 @@ class EmergeQueue(object):
self._build_pool = multiprocessing.Pool(self._build_procs, EmergeWorker,
args)
+ if self._unpack_only:
+ # Unpack pool only required on unpack_only jobs.
+ self._unpack_queue = multiprocessing.Queue()
+ args = (self._unpack_queue, self._job_queue, emerge, package_db, False,
+ True)
+ self._unpack_pool = multiprocessing.Pool(self._unpack_procs, EmergeWorker,
+ args)
+
self._print_worker = multiprocessing.Process(target=PrintWorker,
args=[self._print_queue])
self._print_worker.start()
@@ -1267,6 +1327,10 @@ class EmergeQueue(object):
signal.signal(signal.SIGINT, ExitHandler)
signal.signal(signal.SIGTERM, ExitHandler)
+ def _ScheduleUnpack(self, pkg_state):
+ self._unpack_jobs[pkg_state.target] = None
+ self._unpack_queue.put(pkg_state)
+
def _Schedule(self, pkg_state):
# We maintain a tree of all deps, if this doesn't need
# to be installed just free up its children and continue.
@@ -1284,19 +1348,31 @@ class EmergeQueue(object):
self._build_queue.put(pkg_state)
return True
- def _ScheduleLoop(self):
+ def _ScheduleLoop(self, unpack_only=False):
+ if unpack_only:
+ ready_queue = self._unpack_ready
+ jobs_queue = self._unpack_jobs
+ procs = self._unpack_procs
+ else:
+ ready_queue = self._build_ready
+ jobs_queue = self._build_jobs
+ procs = self._build_procs
+
# If the current load exceeds our desired load average, don't schedule
# more than one job.
if self._load_avg and os.getloadavg()[0] > self._load_avg:
needed_jobs = 1
else:
- needed_jobs = self._build_procs
+ needed_jobs = procs
# Schedule more jobs.
- while self._build_ready and len(self._build_jobs) < needed_jobs:
- state = self._build_ready.get()
- if state.target not in self._failed:
- self._Schedule(state)
+ while ready_queue and len(jobs_queue) < needed_jobs:
+ state = ready_queue.get()
+ if unpack_only:
+ self._ScheduleUnpack(state)
+ else:
+ if state.target not in self._failed:
+ self._Schedule(state)
def _Print(self, line):
"""Print a single line."""
@@ -1337,12 +1413,15 @@ class EmergeQueue(object):
if no_output:
seconds = current_time - GLOBAL_START
fjobs, fready = len(self._fetch_jobs), len(self._fetch_ready)
+ ujobs, uready = len(self._unpack_jobs), len(self._unpack_ready)
bjobs, bready = len(self._build_jobs), len(self._build_ready)
retries = len(self._retry_queue)
pending = max(0, len(self._deps_map) - fjobs - bjobs)
line = "Pending %s/%s, " % (pending, self._total_jobs)
if fjobs or fready:
line += "Fetching %s/%s, " % (fjobs, fready + fjobs)
+ if ujobs or uready:
+ line += "Unpacking %s/%s, " % (ujobs, uready + ujobs)
if bjobs or bready or retries:
line += "Building %s/%s, " % (bjobs, bready + bjobs)
if retries:
@@ -1405,6 +1484,10 @@ class EmergeQueue(object):
_stop(self._build_queue, self._build_pool)
self._build_queue = self._build_pool = None
+ if self._unpack_only:
+ _stop(self._unpack_queue, self._unpack_pool)
+ self._unpack_queue = self._unpack_pool = None
+
if self._job_queue is not None:
self._job_queue.close()
self._job_queue = None
@@ -1444,6 +1527,8 @@ class EmergeQueue(object):
self._job_queue.empty() and
not self._fetch_jobs and
not self._fetch_ready and
+ not self._unpack_jobs and
+ not self._unpack_ready and
not self._build_jobs and
not self._build_ready and
self._deps_map):
@@ -1498,6 +1583,10 @@ class EmergeQueue(object):
self._build_ready.put(state)
self._ScheduleLoop()
+ if self._unpack_only and job.retcode == 0:
+ self._unpack_ready.put(state)
+ self._ScheduleLoop(unpack_only=True)
+
if self._fetch_ready:
state = self._fetch_ready.get()
self._fetch_queue.put(state)
@@ -1508,6 +1597,23 @@ class EmergeQueue(object):
self._fetch_queue.put(None)
continue
+ if job.unpack_only:
+ if not job.done:
+ self._unpack_jobs[target] = job
+ else:
+ del self._unpack_jobs[target]
+ self._Print("Unpacked %s in %2.2fs"
+ % (target, time.time() - job.start_timestamp))
+ if self._show_output or job.retcode != 0:
+ self._print_queue.put(JobPrinter(job, unlink=True))
+ else:
+ os.unlink(job.filename)
+ if self._unpack_ready:
+ state = self._unpack_ready.get()
+ self._unpack_queue.put(state)
+ self._unpack_jobs[state.target] = None
+ continue
+
if not job.done:
self._build_jobs[target] = job
self._Print("Started %s (logged in %s)" % (target, job.filename))
@@ -1667,7 +1773,8 @@ def real_main(argv):
os.execvp(args[0], args)
# Run the queued emerges.
- scheduler = EmergeQueue(deps_graph, emerge, deps.package_db, deps.show_output)
+ scheduler = EmergeQueue(deps_graph, emerge, deps.package_db, deps.show_output,
+ deps.unpack_only)
try:
scheduler.Run()
finally: