diff options
author | Thiago Goncales <thiagog@chromium.org> | 2013-07-17 10:26:35 -0700 |
---|---|---|
committer | ChromeBot <chrome-bot@google.com> | 2013-08-09 14:02:50 -0700 |
commit | f4acc429f96224bbe697e2edf137f7e030e7f240 (patch) | |
tree | b3bb50933ed244a0c2d81617e0b85da81afe65fa /scripts/parallel_emerge.py | |
parent | a81d523bb5e29b02716075abc42e9e2a57d8d2e0 (diff) | |
download | chromite-f4acc429f96224bbe697e2edf137f7e030e7f240.tar.gz |
parallel_emerge: add --unpackonly.
The --unpackonly flag makes parallel-emerge fetch and then unpack all
packages, but not emerge them. This was accomplished by creating an
unpack queue, which, if --unpackonly is set, is filled as the fetch queue
is completed.
If we desire to work with the binary packages only of a project, the
workflow is to run --fetchonly and then extracting each package. By
adding --unpackonly, we can parallelize this with fetchonly and make the
entire workflow faster.
BUG=chromium:264240
TEST=Compared sysroot using --unpackonly with using --fetchonly and then
unpacking all packages.
Change-Id: I3d9acb7032f036b786e3cba9843b7720471eecdb
Signed-off-by: Thiago Goncales <thiagog@chromium.org>
Reviewed-on: https://gerrit.chromium.org/gerrit/62856
Reviewed-by: David James <davidjames@chromium.org>
Diffstat (limited to 'scripts/parallel_emerge.py')
-rw-r--r-- | scripts/parallel_emerge.py | 143 |
1 files changed, 125 insertions, 18 deletions
diff --git a/scripts/parallel_emerge.py b/scripts/parallel_emerge.py index 5e5833b00..f536b1892 100644 --- a/scripts/parallel_emerge.py +++ b/scripts/parallel_emerge.py @@ -29,6 +29,8 @@ import threading import time import traceback +from chromite.lib import cros_build_lib + # If PORTAGE_USERNAME isn't specified, scrape it from the $HOME variable. On # Chromium OS, the default "portage" user doesn't have the necessary # permissions. It'd be easier if we could default to $USERNAME, but $USERNAME @@ -203,13 +205,14 @@ class DepGraphGenerator(object): PrintDepsMap(deps_graph) """ - __slots__ = ["board", "emerge", "package_db", "show_output"] + __slots__ = ["board", "emerge", "package_db", "show_output", "unpack_only"] def __init__(self): self.board = None self.emerge = EmergeData() self.package_db = {} self.show_output = False + self.unpack_only = False def ParseParallelEmergeArgs(self, argv): """Read the parallel emerge arguments from the command-line. @@ -239,6 +242,9 @@ class DepGraphGenerator(object): self.show_output = True elif arg == "--rebuild": emerge_args.append("--rebuild-if-unbuilt") + elif arg == "--unpackonly": + emerge_args.append("--fetchonly") + self.unpack_only = True else: # Not one of our options, so pass through to emerge. emerge_args.append(arg) @@ -792,10 +798,10 @@ def PrintDepsMap(deps_map): class EmergeJobState(object): __slots__ = ["done", "filename", "last_notify_timestamp", "last_output_seek", "last_output_timestamp", "pkgname", "retcode", "start_timestamp", - "target", "fetch_only"] + "target", "fetch_only", "unpack_only"] def __init__(self, target, pkgname, done, filename, start_timestamp, - retcode=None, fetch_only=False): + retcode=None, fetch_only=False, unpack_only=False): # The full name of the target we're building (e.g. # chromeos-base/chromeos-0.0.1-r60) @@ -833,6 +839,9 @@ class EmergeJobState(object): # The timestamp when our job started. self.start_timestamp = start_timestamp + # No emerge, only unpack packages. + self.unpack_only = unpack_only + def KillHandler(_signum, _frame): # Kill self and all subprocesses. @@ -916,7 +925,42 @@ def EmergeProcess(output, *args, **kwargs): # Return the exit code of the subprocess. return os.waitpid(pid, 0)[1] -def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False): + +def UnpackPackage(pkg_state): + """Unpacks package described by pkg_state. + + Args: + pkg_state: EmergeJobState object describing target. + + Returns: + Exit code returned by subprocess. + """ + pkgdir = os.environ.get("PKGDIR", + os.path.join(os.environ["SYSROOT"], "packages")) + root = os.environ.get("ROOT", os.environ["SYSROOT"]) + path = os.path.join(pkgdir, pkg_state.target + ".tbz2") + comp = cros_build_lib.FindCompressor(cros_build_lib.COMP_BZIP2) + cmd = [comp, "-dc"] + if comp.endswith("pbzip2"): + cmd.append("--ignore-trailing-garbage=1") + cmd.append(path) + + result = cros_build_lib.RunCommand(cmd, cwd=root, stdout_to_pipe=True, + print_cmd=False, error_code_ok=True) + + # If we were not successful, return now and don't attempt untar. + if result.returncode: + return result.returncode + + cmd = ["sudo", "tar", "-xf", "-", "-C", root] + result = cros_build_lib.RunCommand(cmd, cwd=root, input=result.output, + print_cmd=False, error_code_ok=True) + + return result.returncode + + +def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False, + unpack_only=False): """This worker emerges any packages given to it on the task_queue. Args: @@ -925,6 +969,7 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False): emerge: An EmergeData() object. package_db: A dict, mapping package ids to portage Package objects. fetch_only: A bool, indicating if we should just fetch the target. + unpack_only: A bool, indicating if we should just unpack the target. It expects package identifiers to be passed to it via task_queue. When a task is started, it pushes the (target, filename) to the started_queue. @@ -982,16 +1027,19 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False): os.chmod(output.name, 644) start_timestamp = time.time() job = EmergeJobState(target, pkgname, False, output.name, start_timestamp, - fetch_only=fetch_only) + fetch_only=fetch_only, unpack_only=unpack_only) job_queue.put(job) if "--pretend" in opts: retcode = 0 else: try: emerge.scheduler_graph.mergelist = install_list - retcode = EmergeProcess(output, settings, trees, mtimedb, opts, - spinner, favorites=emerge.favorites, - graph_config=emerge.scheduler_graph) + if unpack_only: + retcode = UnpackPackage(pkg_state) + else: + retcode = EmergeProcess(output, settings, trees, mtimedb, opts, + spinner, favorites=emerge.favorites, + graph_config=emerge.scheduler_graph) except Exception: traceback.print_exc(file=output) retcode = 1 @@ -1001,7 +1049,8 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False): return job = EmergeJobState(target, pkgname, True, output.name, start_timestamp, - retcode, fetch_only=fetch_only) + retcode, fetch_only=fetch_only, + unpack_only=unpack_only) job_queue.put(job) @@ -1176,7 +1225,7 @@ class ScoredHeap(object): class EmergeQueue(object): """Class to schedule emerge jobs according to a dependency graph.""" - def __init__(self, deps_map, emerge, package_db, show_output): + def __init__(self, deps_map, emerge, package_db, show_output, unpack_only): # Store the dependency graph. self._deps_map = deps_map self._state_map = {} @@ -1185,10 +1234,13 @@ class EmergeQueue(object): self._build_ready = ScoredHeap() self._fetch_jobs = {} self._fetch_ready = ScoredHeap() + self._unpack_jobs = {} + self._unpack_ready = ScoredHeap() # List of total package installs represented in deps_map. install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"] self._total_jobs = len(install_jobs) self._show_output = show_output + self._unpack_only = unpack_only if "--pretend" in emerge.opts: print "Skipping merge because of --pretend mode." @@ -1207,7 +1259,7 @@ class EmergeQueue(object): # jobs. procs = min(self._total_jobs, emerge.opts.pop("--jobs", multiprocessing.cpu_count())) - self._build_procs = self._fetch_procs = max(1, procs) + self._build_procs = self._unpack_procs = self._fetch_procs = max(1, procs) self._load_avg = emerge.opts.pop("--load-average", None) self._job_queue = multiprocessing.Queue() self._print_queue = multiprocessing.Queue() @@ -1222,6 +1274,14 @@ class EmergeQueue(object): self._build_pool = multiprocessing.Pool(self._build_procs, EmergeWorker, args) + if self._unpack_only: + # Unpack pool only required on unpack_only jobs. + self._unpack_queue = multiprocessing.Queue() + args = (self._unpack_queue, self._job_queue, emerge, package_db, False, + True) + self._unpack_pool = multiprocessing.Pool(self._unpack_procs, EmergeWorker, + args) + self._print_worker = multiprocessing.Process(target=PrintWorker, args=[self._print_queue]) self._print_worker.start() @@ -1267,6 +1327,10 @@ class EmergeQueue(object): signal.signal(signal.SIGINT, ExitHandler) signal.signal(signal.SIGTERM, ExitHandler) + def _ScheduleUnpack(self, pkg_state): + self._unpack_jobs[pkg_state.target] = None + self._unpack_queue.put(pkg_state) + def _Schedule(self, pkg_state): # We maintain a tree of all deps, if this doesn't need # to be installed just free up its children and continue. @@ -1284,19 +1348,31 @@ class EmergeQueue(object): self._build_queue.put(pkg_state) return True - def _ScheduleLoop(self): + def _ScheduleLoop(self, unpack_only=False): + if unpack_only: + ready_queue = self._unpack_ready + jobs_queue = self._unpack_jobs + procs = self._unpack_procs + else: + ready_queue = self._build_ready + jobs_queue = self._build_jobs + procs = self._build_procs + # If the current load exceeds our desired load average, don't schedule # more than one job. if self._load_avg and os.getloadavg()[0] > self._load_avg: needed_jobs = 1 else: - needed_jobs = self._build_procs + needed_jobs = procs # Schedule more jobs. - while self._build_ready and len(self._build_jobs) < needed_jobs: - state = self._build_ready.get() - if state.target not in self._failed: - self._Schedule(state) + while ready_queue and len(jobs_queue) < needed_jobs: + state = ready_queue.get() + if unpack_only: + self._ScheduleUnpack(state) + else: + if state.target not in self._failed: + self._Schedule(state) def _Print(self, line): """Print a single line.""" @@ -1337,12 +1413,15 @@ class EmergeQueue(object): if no_output: seconds = current_time - GLOBAL_START fjobs, fready = len(self._fetch_jobs), len(self._fetch_ready) + ujobs, uready = len(self._unpack_jobs), len(self._unpack_ready) bjobs, bready = len(self._build_jobs), len(self._build_ready) retries = len(self._retry_queue) pending = max(0, len(self._deps_map) - fjobs - bjobs) line = "Pending %s/%s, " % (pending, self._total_jobs) if fjobs or fready: line += "Fetching %s/%s, " % (fjobs, fready + fjobs) + if ujobs or uready: + line += "Unpacking %s/%s, " % (ujobs, uready + ujobs) if bjobs or bready or retries: line += "Building %s/%s, " % (bjobs, bready + bjobs) if retries: @@ -1405,6 +1484,10 @@ class EmergeQueue(object): _stop(self._build_queue, self._build_pool) self._build_queue = self._build_pool = None + if self._unpack_only: + _stop(self._unpack_queue, self._unpack_pool) + self._unpack_queue = self._unpack_pool = None + if self._job_queue is not None: self._job_queue.close() self._job_queue = None @@ -1444,6 +1527,8 @@ class EmergeQueue(object): self._job_queue.empty() and not self._fetch_jobs and not self._fetch_ready and + not self._unpack_jobs and + not self._unpack_ready and not self._build_jobs and not self._build_ready and self._deps_map): @@ -1498,6 +1583,10 @@ class EmergeQueue(object): self._build_ready.put(state) self._ScheduleLoop() + if self._unpack_only and job.retcode == 0: + self._unpack_ready.put(state) + self._ScheduleLoop(unpack_only=True) + if self._fetch_ready: state = self._fetch_ready.get() self._fetch_queue.put(state) @@ -1508,6 +1597,23 @@ class EmergeQueue(object): self._fetch_queue.put(None) continue + if job.unpack_only: + if not job.done: + self._unpack_jobs[target] = job + else: + del self._unpack_jobs[target] + self._Print("Unpacked %s in %2.2fs" + % (target, time.time() - job.start_timestamp)) + if self._show_output or job.retcode != 0: + self._print_queue.put(JobPrinter(job, unlink=True)) + else: + os.unlink(job.filename) + if self._unpack_ready: + state = self._unpack_ready.get() + self._unpack_queue.put(state) + self._unpack_jobs[state.target] = None + continue + if not job.done: self._build_jobs[target] = job self._Print("Started %s (logged in %s)" % (target, job.filename)) @@ -1667,7 +1773,8 @@ def real_main(argv): os.execvp(args[0], args) # Run the queued emerges. - scheduler = EmergeQueue(deps_graph, emerge, deps.package_db, deps.show_output) + scheduler = EmergeQueue(deps_graph, emerge, deps.package_db, deps.show_output, + deps.unpack_only) try: scheduler.Run() finally: |