aboutsummaryrefslogtreecommitdiff
path: root/crosperf/schedv2.py
diff options
context:
space:
mode:
Diffstat (limited to 'crosperf/schedv2.py')
-rw-r--r--crosperf/schedv2.py798
1 files changed, 414 insertions, 384 deletions
diff --git a/crosperf/schedv2.py b/crosperf/schedv2.py
index 49c6344d..828b8b81 100644
--- a/crosperf/schedv2.py
+++ b/crosperf/schedv2.py
@@ -1,449 +1,479 @@
# -*- coding: utf-8 -*-
-# Copyright 2015 The Chromium OS Authors. All rights reserved.
+# Copyright 2015 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module to optimize the scheduling of benchmark_run tasks."""
-from __future__ import division
-from __future__ import print_function
-
-import sys
-import traceback
from collections import defaultdict
+import sys
from threading import Lock
from threading import Thread
+import traceback
-import test_flag
-
-from machine_image_manager import MachineImageManager
from cros_utils import command_executer
from cros_utils import logger
+from machine_image_manager import MachineImageManager
+import test_flag
class DutWorker(Thread):
- """Working thread for a dut."""
-
- def __init__(self, dut, sched):
- super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name))
- self._dut = dut
- self._sched = sched
- self._stat_num_br_run = 0
- self._stat_num_reimage = 0
- self._stat_annotation = ''
- self._logger = logger.GetLogger(self._sched.get_experiment().log_dir)
- self.daemon = True
- self._terminated = False
- self._active_br = None
- # Race condition accessing _active_br between _execute_benchmark_run and
- # _terminate, so lock it up.
- self._active_br_lock = Lock()
-
- def terminate(self):
- self._terminated = True
- with self._active_br_lock:
- if self._active_br is not None:
- # BenchmarkRun.Terminate() terminates any running testcase via
- # suite_runner.Terminate and updates timeline.
- self._active_br.Terminate()
-
- def run(self):
- """Do the "run-test->(optionally reimage)->run-test" chore.
-
- Note - 'br' below means 'benchmark_run'.
- """
-
- # Firstly, handle benchmarkruns that have cache hit.
- br = self._sched.get_cached_benchmark_run()
- while br:
- try:
- self._stat_annotation = 'finishing cached {}'.format(br)
- br.run()
- except RuntimeError:
- traceback.print_exc(file=sys.stdout)
- br = self._sched.get_cached_benchmark_run()
-
- # Secondly, handle benchmarkruns that needs to be run on dut.
- self._setup_dut_label()
- try:
- self._logger.LogOutput('{} started.'.format(self))
- while not self._terminated:
- br = self._sched.get_benchmark_run(self._dut)
- if br is None:
- # No br left for this label. Considering reimaging.
- label = self._sched.allocate_label(self._dut)
- if label is None:
- # No br even for other labels. We are done.
- self._logger.LogOutput('ImageManager found no label '
- 'for dut, stopping working '
- 'thread {}.'.format(self))
- break
- if self._reimage(label):
- # Reimage to run other br fails, dut is doomed, stop
- # this thread.
- self._logger.LogWarning('Re-image failed, dut '
- 'in an unstable state, stopping '
- 'working thread {}.'.format(self))
- break
- else:
- # Execute the br.
- self._execute_benchmark_run(br)
- finally:
- self._stat_annotation = 'finished'
- # Thread finishes. Notify scheduler that I'm done.
- self._sched.dut_worker_finished(self)
-
- def _reimage(self, label):
- """Reimage image to label.
-
- Args:
- label: the label to remimage onto dut.
-
- Returns:
- 0 if successful, otherwise 1.
- """
-
- # Termination could happen anywhere, check it.
- if self._terminated:
- return 1
-
- if self._sched.get_experiment().crosfleet:
- self._logger.LogOutput('Crosfleet mode, do not image before testing.')
- self._dut.label = label
- return 0
-
- self._logger.LogOutput('Reimaging {} using {}'.format(self, label))
- self._stat_num_reimage += 1
- self._stat_annotation = 'reimaging using "{}"'.format(label.name)
- try:
- # Note, only 1 reimage at any given time, this is guaranteed in
- # ImageMachine, so no sync needed below.
- retval = self._sched.get_experiment().machine_manager.ImageMachine(
- self._dut, label)
+ """Working thread for a dut."""
+
+ def __init__(self, dut, sched):
+ super(DutWorker, self).__init__(name="DutWorker-{}".format(dut.name))
+ self._dut = dut
+ self._sched = sched
+ self._stat_num_br_run = 0
+ self._stat_num_reimage = 0
+ self._stat_annotation = ""
+ self._logger = logger.GetLogger(self._sched.get_experiment().log_dir)
+ self.daemon = True
+ self._terminated = False
+ self._active_br = None
+ # Race condition accessing _active_br between _execute_benchmark_run and
+ # _terminate, so lock it up.
+ self._active_br_lock = Lock()
- if retval:
- return 1
- except RuntimeError:
- return 1
+ def terminate(self):
+ self._terminated = True
+ with self._active_br_lock:
+ if self._active_br is not None:
+ # BenchmarkRun.Terminate() terminates any running testcase via
+ # suite_runner.Terminate and updates timeline.
+ self._active_br.Terminate()
- self._dut.label = label
- return 0
+ def run(self):
+ """Do the "run-test->(optionally reimage)->run-test" chore.
- def _execute_benchmark_run(self, br):
- """Execute a single benchmark_run.
+ Note - 'br' below means 'benchmark_run'.
+ """
+
+ # Firstly, handle benchmarkruns that have cache hit.
+ br = self._sched.get_cached_benchmark_run()
+ while br:
+ try:
+ self._stat_annotation = "finishing cached {}".format(br)
+ br.run()
+ except RuntimeError:
+ traceback.print_exc(file=sys.stdout)
+ br = self._sched.get_cached_benchmark_run()
+
+ # Secondly, handle benchmarkruns that needs to be run on dut.
+ self._setup_dut_label()
+ try:
+ self._logger.LogOutput("{} started.".format(self))
+ while not self._terminated:
+ br = self._sched.get_benchmark_run(self._dut)
+ if br is None:
+ # No br left for this label. Considering reimaging.
+ label = self._sched.allocate_label(self._dut)
+ if label is None:
+ # No br even for other labels. We are done.
+ self._logger.LogOutput(
+ "ImageManager found no label "
+ "for dut, stopping working "
+ "thread {}.".format(self)
+ )
+ break
+ if self._reimage(label):
+ # Reimage to run other br fails, dut is doomed, stop
+ # this thread.
+ self._logger.LogWarning(
+ "Re-image failed, dut "
+ "in an unstable state, stopping "
+ "working thread {}.".format(self)
+ )
+ break
+ else:
+ # Execute the br.
+ self._execute_benchmark_run(br)
+ finally:
+ self._stat_annotation = "finished"
+ # Thread finishes. Notify scheduler that I'm done.
+ self._sched.dut_worker_finished(self)
+
+ def _reimage(self, label):
+ """Reimage image to label.
+
+ Args:
+ label: the label to remimage onto dut.
+
+ Returns:
+ 0 if successful, otherwise 1.
+ """
+
+ # Termination could happen anywhere, check it.
+ if self._terminated:
+ return 1
+
+ if self._sched.get_experiment().crosfleet:
+ self._logger.LogOutput(
+ "Crosfleet mode, do not image before testing."
+ )
+ self._dut.label = label
+ return 0
+
+ self._logger.LogOutput("Reimaging {} using {}".format(self, label))
+ self._stat_num_reimage += 1
+ self._stat_annotation = 'reimaging using "{}"'.format(label.name)
+ try:
+ # Note, only 1 reimage at any given time, this is guaranteed in
+ # ImageMachine, so no sync needed below.
+ retval = self._sched.get_experiment().machine_manager.ImageMachine(
+ self._dut, label
+ )
+
+ if retval:
+ return 1
+ except RuntimeError:
+ return 1
+
+ self._dut.label = label
+ return 0
+
+ def _execute_benchmark_run(self, br):
+ """Execute a single benchmark_run.
Note - this function never throws exceptions.
- """
+ """
- # Termination could happen anywhere, check it.
- if self._terminated:
- return
-
- self._logger.LogOutput('{} started working on {}'.format(self, br))
- self._stat_num_br_run += 1
- self._stat_annotation = 'executing {}'.format(br)
- # benchmark_run.run does not throws, but just play it safe here.
- try:
- assert br.owner_thread is None
- br.owner_thread = self
- with self._active_br_lock:
- self._active_br = br
- br.run()
- finally:
- self._sched.get_experiment().BenchmarkRunFinished(br)
- with self._active_br_lock:
- self._active_br = None
+ # Termination could happen anywhere, check it.
+ if self._terminated:
+ return
- def _setup_dut_label(self):
- """Try to match dut image with a certain experiment label.
+ self._logger.LogOutput("{} started working on {}".format(self, br))
+ self._stat_num_br_run += 1
+ self._stat_annotation = "executing {}".format(br)
+ # benchmark_run.run does not throws, but just play it safe here.
+ try:
+ assert br.owner_thread is None
+ br.owner_thread = self
+ with self._active_br_lock:
+ self._active_br = br
+ br.run()
+ finally:
+ self._sched.get_experiment().BenchmarkRunFinished(br)
+ with self._active_br_lock:
+ self._active_br = None
+
+ def _setup_dut_label(self):
+ """Try to match dut image with a certain experiment label.
If such match is found, we just skip doing reimage and jump to execute
some benchmark_runs.
- """
-
- checksum_file = '/usr/local/osimage_checksum_file'
- try:
- rv, checksum, _ = command_executer.GetCommandExecuter().\
- CrosRunCommandWOutput(
- 'cat ' + checksum_file,
- chromeos_root=self._sched.get_labels(0).chromeos_root,
- machine=self._dut.name,
- print_to_console=False)
- if rv == 0:
- checksum = checksum.strip()
- for l in self._sched.get_labels():
- if l.checksum == checksum:
- self._logger.LogOutput("Dut '{}' is pre-installed with '{}'".format(
- self._dut.name, l))
- self._dut.label = l
- return
- except RuntimeError:
- traceback.print_exc(file=sys.stdout)
- self._dut.label = None
-
- def __str__(self):
- return 'DutWorker[dut="{}", label="{}"]'.format(
- self._dut.name, self._dut.label.name if self._dut.label else 'None')
-
- def dut(self):
- return self._dut
-
- def status_str(self):
- """Report thread status."""
-
- return ('Worker thread "{}", label="{}", benchmark_run={}, '
- 'reimage={}, now {}'.format(
+ """
+
+ checksum_file = "/usr/local/osimage_checksum_file"
+ try:
+ (
+ rv,
+ checksum,
+ _,
+ ) = command_executer.GetCommandExecuter().CrosRunCommandWOutput(
+ "cat " + checksum_file,
+ chromeos_root=self._sched.get_labels(0).chromeos_root,
+ machine=self._dut.name,
+ print_to_console=False,
+ )
+ if rv == 0:
+ checksum = checksum.strip()
+ for l in self._sched.get_labels():
+ if l.checksum == checksum:
+ self._logger.LogOutput(
+ "Dut '{}' is pre-installed with '{}'".format(
+ self._dut.name, l
+ )
+ )
+ self._dut.label = l
+ return
+ except RuntimeError:
+ traceback.print_exc(file=sys.stdout)
+ self._dut.label = None
+
+ def __str__(self):
+ return 'DutWorker[dut="{}", label="{}"]'.format(
+ self._dut.name, self._dut.label.name if self._dut.label else "None"
+ )
+
+ def dut(self):
+ return self._dut
+
+ def status_str(self):
+ """Report thread status."""
+
+ return (
+ 'Worker thread "{}", label="{}", benchmark_run={}, '
+ "reimage={}, now {}".format(
self._dut.name,
- 'None' if self._dut.label is None else self._dut.label.name,
- self._stat_num_br_run, self._stat_num_reimage,
- self._stat_annotation))
+ "None" if self._dut.label is None else self._dut.label.name,
+ self._stat_num_br_run,
+ self._stat_num_reimage,
+ self._stat_annotation,
+ )
+ )
class BenchmarkRunCacheReader(Thread):
- """The thread to read cache for a list of benchmark_runs.
+ """The thread to read cache for a list of benchmark_runs.
On creation, each instance of this class is given a br_list, which is a
subset of experiment._benchmark_runs.
- """
-
- def __init__(self, schedv2, br_list):
- super(BenchmarkRunCacheReader, self).__init__()
- self._schedv2 = schedv2
- self._br_list = br_list
- self._logger = self._schedv2.get_logger()
-
- def run(self):
- for br in self._br_list:
- try:
- br.ReadCache()
- if br.cache_hit:
- self._logger.LogOutput('Cache hit - {}'.format(br))
- with self._schedv2.lock_on('_cached_br_list'):
- self._schedv2.get_cached_run_list().append(br)
- else:
- self._logger.LogOutput('Cache not hit - {}'.format(br))
- except RuntimeError:
- traceback.print_exc(file=sys.stderr)
+ """
+
+ def __init__(self, schedv2, br_list):
+ super(BenchmarkRunCacheReader, self).__init__()
+ self._schedv2 = schedv2
+ self._br_list = br_list
+ self._logger = self._schedv2.get_logger()
+
+ def run(self):
+ for br in self._br_list:
+ try:
+ br.ReadCache()
+ if br.cache_hit:
+ self._logger.LogOutput("Cache hit - {}".format(br))
+ with self._schedv2.lock_on("_cached_br_list"):
+ self._schedv2.get_cached_run_list().append(br)
+ else:
+ self._logger.LogOutput("Cache not hit - {}".format(br))
+ except RuntimeError:
+ traceback.print_exc(file=sys.stderr)
class Schedv2(object):
- """New scheduler for crosperf."""
+ """New scheduler for crosperf."""
- def __init__(self, experiment):
- self._experiment = experiment
- self._logger = logger.GetLogger(experiment.log_dir)
+ def __init__(self, experiment):
+ self._experiment = experiment
+ self._logger = logger.GetLogger(experiment.log_dir)
- # Create shortcuts to nested data structure. "_duts" points to a list of
- # locked machines. _labels points to a list of all labels.
- self._duts = self._experiment.machine_manager.GetMachines()
- self._labels = self._experiment.labels
+ # Create shortcuts to nested data structure. "_duts" points to a list of
+ # locked machines. _labels points to a list of all labels.
+ self._duts = self._experiment.machine_manager.GetMachines()
+ self._labels = self._experiment.labels
- # Bookkeeping for synchronization.
- self._workers_lock = Lock()
- # pylint: disable=unnecessary-lambda
- self._lock_map = defaultdict(lambda: Lock())
+ # Bookkeeping for synchronization.
+ self._workers_lock = Lock()
+ # pylint: disable=unnecessary-lambda
+ self._lock_map = defaultdict(lambda: Lock())
- # Test mode flag
- self._in_test_mode = test_flag.GetTestMode()
+ # Test mode flag
+ self._in_test_mode = test_flag.GetTestMode()
- # Read benchmarkrun cache.
- self._read_br_cache()
+ # Read benchmarkrun cache.
+ self._read_br_cache()
- # Mapping from label to a list of benchmark_runs.
- self._label_brl_map = dict((l, []) for l in self._labels)
- for br in self._experiment.benchmark_runs:
- assert br.label in self._label_brl_map
- # Only put no-cache-hit br into the map.
- if br not in self._cached_br_list:
- self._label_brl_map[br.label].append(br)
+ # Mapping from label to a list of benchmark_runs.
+ self._label_brl_map = dict((l, []) for l in self._labels)
+ for br in self._experiment.benchmark_runs:
+ assert br.label in self._label_brl_map
+ # Only put no-cache-hit br into the map.
+ if br not in self._cached_br_list:
+ self._label_brl_map[br.label].append(br)
- # Use machine image manager to calculate initial label allocation.
- self._mim = MachineImageManager(self._labels, self._duts)
- self._mim.compute_initial_allocation()
+ # Use machine image manager to calculate initial label allocation.
+ self._mim = MachineImageManager(self._labels, self._duts)
+ self._mim.compute_initial_allocation()
- # Create worker thread, 1 per dut.
- self._active_workers = [DutWorker(dut, self) for dut in self._duts]
- self._finished_workers = []
+ # Create worker thread, 1 per dut.
+ self._active_workers = [DutWorker(dut, self) for dut in self._duts]
+ self._finished_workers = []
- # Termination flag.
- self._terminated = False
+ # Termination flag.
+ self._terminated = False
- def run_sched(self):
- """Start all dut worker threads and return immediately."""
+ def run_sched(self):
+ """Start all dut worker threads and return immediately."""
- for w in self._active_workers:
- w.start()
+ for w in self._active_workers:
+ w.start()
- def _read_br_cache(self):
- """Use multi-threading to read cache for all benchmarkruns.
+ def _read_br_cache(self):
+ """Use multi-threading to read cache for all benchmarkruns.
We do this by firstly creating a few threads, and then assign each
thread a segment of all brs. Each thread will check cache status for
each br and put those with cache into '_cached_br_list'.
- """
-
- self._cached_br_list = []
- n_benchmarkruns = len(self._experiment.benchmark_runs)
- if n_benchmarkruns <= 4:
- # Use single thread to read cache.
- self._logger.LogOutput(('Starting to read cache status for '
- '{} benchmark runs ...').format(n_benchmarkruns))
- BenchmarkRunCacheReader(self, self._experiment.benchmark_runs).run()
- return
-
- # Split benchmarkruns set into segments. Each segment will be handled by
- # a thread. Note, we use (x+3)/4 to mimic math.ceil(x/4).
- n_threads = max(2, min(20, (n_benchmarkruns + 3) // 4))
- self._logger.LogOutput(
- ('Starting {} threads to read cache status for '
- '{} benchmark runs ...').format(n_threads, n_benchmarkruns))
- benchmarkruns_per_thread = (n_benchmarkruns + n_threads - 1) // n_threads
- benchmarkrun_segments = []
- for i in range(n_threads - 1):
- start = i * benchmarkruns_per_thread
- end = (i + 1) * benchmarkruns_per_thread
- benchmarkrun_segments.append(self._experiment.benchmark_runs[start:end])
- benchmarkrun_segments.append(
- self._experiment.benchmark_runs[(n_threads - 1) *
- benchmarkruns_per_thread:])
-
- # Assert: aggregation of benchmarkrun_segments equals to benchmark_runs.
- assert sum(len(x) for x in benchmarkrun_segments) == n_benchmarkruns
-
- # Create and start all readers.
- cache_readers = [
- BenchmarkRunCacheReader(self, x) for x in benchmarkrun_segments
- ]
-
- for x in cache_readers:
- x.start()
-
- # Wait till all readers finish.
- for x in cache_readers:
- x.join()
-
- # Summarize.
- self._logger.LogOutput(
- 'Total {} cache hit out of {} benchmark_runs.'.format(
- len(self._cached_br_list), n_benchmarkruns))
-
- def get_cached_run_list(self):
- return self._cached_br_list
-
- def get_label_map(self):
- return self._label_brl_map
-
- def get_experiment(self):
- return self._experiment
-
- def get_labels(self, i=None):
- if i is None:
- return self._labels
- return self._labels[i]
-
- def get_logger(self):
- return self._logger
-
- def get_cached_benchmark_run(self):
- """Get a benchmark_run with 'cache hit'.
-
- Returns:
- The benchmark that has cache hit, if any. Otherwise none.
- """
-
- with self.lock_on('_cached_br_list'):
- if self._cached_br_list:
- return self._cached_br_list.pop()
- return None
-
- def get_benchmark_run(self, dut):
- """Get a benchmark_run (br) object for a certain dut.
-
- Args:
- dut: the dut for which a br is returned.
-
- Returns:
- A br with its label matching that of the dut. If no such br could be
- found, return None (this usually means a reimage is required for the
- dut).
- """
-
- # If terminated, stop providing any br.
- if self._terminated:
- return None
-
- # If dut bears an unrecognized label, return None.
- if dut.label is None:
- return None
-
- # If br list for the dut's label is empty (that means all brs for this
- # label have been done), return None.
- with self.lock_on(dut.label):
- brl = self._label_brl_map[dut.label]
- if not brl:
- return None
- # Return the first br.
- return brl.pop(0)
-
- def allocate_label(self, dut):
- """Allocate a label to a dut.
-
- The work is delegated to MachineImageManager.
+ """
+
+ self._cached_br_list = []
+ n_benchmarkruns = len(self._experiment.benchmark_runs)
+ if n_benchmarkruns <= 4:
+ # Use single thread to read cache.
+ self._logger.LogOutput(
+ (
+ "Starting to read cache status for " "{} benchmark runs ..."
+ ).format(n_benchmarkruns)
+ )
+ BenchmarkRunCacheReader(self, self._experiment.benchmark_runs).run()
+ return
- The dut_worker calling this method is responsible for reimage the dut to
- this label.
+ # Split benchmarkruns set into segments. Each segment will be handled by
+ # a thread. Note, we use (x+3)/4 to mimic math.ceil(x/4).
+ n_threads = max(2, min(20, (n_benchmarkruns + 3) // 4))
+ self._logger.LogOutput(
+ (
+ "Starting {} threads to read cache status for "
+ "{} benchmark runs ..."
+ ).format(n_threads, n_benchmarkruns)
+ )
+ benchmarkruns_per_thread = (
+ n_benchmarkruns + n_threads - 1
+ ) // n_threads
+ benchmarkrun_segments = []
+ for i in range(n_threads - 1):
+ start = i * benchmarkruns_per_thread
+ end = (i + 1) * benchmarkruns_per_thread
+ benchmarkrun_segments.append(
+ self._experiment.benchmark_runs[start:end]
+ )
+ benchmarkrun_segments.append(
+ self._experiment.benchmark_runs[
+ (n_threads - 1) * benchmarkruns_per_thread :
+ ]
+ )
+
+ # Assert: aggregation of benchmarkrun_segments equals to benchmark_runs.
+ assert sum(len(x) for x in benchmarkrun_segments) == n_benchmarkruns
+
+ # Create and start all readers.
+ cache_readers = [
+ BenchmarkRunCacheReader(self, x) for x in benchmarkrun_segments
+ ]
+
+ for x in cache_readers:
+ x.start()
+
+ # Wait till all readers finish.
+ for x in cache_readers:
+ x.join()
+
+ # Summarize.
+ self._logger.LogOutput(
+ "Total {} cache hit out of {} benchmark_runs.".format(
+ len(self._cached_br_list), n_benchmarkruns
+ )
+ )
+
+ def get_cached_run_list(self):
+ return self._cached_br_list
+
+ def get_label_map(self):
+ return self._label_brl_map
+
+ def get_experiment(self):
+ return self._experiment
+
+ def get_labels(self, i=None):
+ if i is None:
+ return self._labels
+ return self._labels[i]
+
+ def get_logger(self):
+ return self._logger
+
+ def get_cached_benchmark_run(self):
+ """Get a benchmark_run with 'cache hit'.
+
+ Returns:
+ The benchmark that has cache hit, if any. Otherwise none.
+ """
+
+ with self.lock_on("_cached_br_list"):
+ if self._cached_br_list:
+ return self._cached_br_list.pop()
+ return None
+
+ def get_benchmark_run(self, dut):
+ """Get a benchmark_run (br) object for a certain dut.
+
+ Args:
+ dut: the dut for which a br is returned.
+
+ Returns:
+ A br with its label matching that of the dut. If no such br could be
+ found, return None (this usually means a reimage is required for the
+ dut).
+ """
+
+ # If terminated, stop providing any br.
+ if self._terminated:
+ return None
+
+ # If dut bears an unrecognized label, return None.
+ if dut.label is None:
+ return None
+
+ # If br list for the dut's label is empty (that means all brs for this
+ # label have been done), return None.
+ with self.lock_on(dut.label):
+ brl = self._label_brl_map[dut.label]
+ if not brl:
+ return None
+ # Return the first br.
+ return brl.pop(0)
+
+ def allocate_label(self, dut):
+ """Allocate a label to a dut.
+
+ The work is delegated to MachineImageManager.
+
+ The dut_worker calling this method is responsible for reimage the dut to
+ this label.
- Args:
- dut: the new label that is to be reimaged onto the dut.
+ Args:
+ dut: the new label that is to be reimaged onto the dut.
- Returns:
- The label or None.
- """
+ Returns:
+ The label or None.
+ """
- if self._terminated:
- return None
+ if self._terminated:
+ return None
- return self._mim.allocate(dut, self)
+ return self._mim.allocate(dut, self)
- def dut_worker_finished(self, dut_worker):
- """Notify schedv2 that the dut_worker thread finished.
+ def dut_worker_finished(self, dut_worker):
+ """Notify schedv2 that the dut_worker thread finished.
- Args:
- dut_worker: the thread that is about to end.
- """
+ Args:
+ dut_worker: the thread that is about to end.
+ """
- self._logger.LogOutput('{} finished.'.format(dut_worker))
- with self._workers_lock:
- self._active_workers.remove(dut_worker)
- self._finished_workers.append(dut_worker)
+ self._logger.LogOutput("{} finished.".format(dut_worker))
+ with self._workers_lock:
+ self._active_workers.remove(dut_worker)
+ self._finished_workers.append(dut_worker)
- def is_complete(self):
- return len(self._active_workers) == 0
+ def is_complete(self):
+ return len(self._active_workers) == 0
- def lock_on(self, my_object):
- return self._lock_map[my_object]
+ def lock_on(self, my_object):
+ return self._lock_map[my_object]
- def terminate(self):
- """Mark flag so we stop providing br/reimages.
+ def terminate(self):
+ """Mark flag so we stop providing br/reimages.
Also terminate each DutWorker, so they refuse to execute br or reimage.
- """
-
- self._terminated = True
- for dut_worker in self._active_workers:
- dut_worker.terminate()
-
- def threads_status_as_string(self):
- """Report the dut worker threads status."""
-
- status = '{} active threads, {} finished threads.\n'.format(
- len(self._active_workers), len(self._finished_workers))
- status += ' Active threads:'
- for dw in self._active_workers:
- status += '\n ' + dw.status_str()
- if self._finished_workers:
- status += '\n Finished threads:'
- for dw in self._finished_workers:
- status += '\n ' + dw.status_str()
- return status
+ """
+
+ self._terminated = True
+ for dut_worker in self._active_workers:
+ dut_worker.terminate()
+
+ def threads_status_as_string(self):
+ """Report the dut worker threads status."""
+
+ status = "{} active threads, {} finished threads.\n".format(
+ len(self._active_workers), len(self._finished_workers)
+ )
+ status += " Active threads:"
+ for dw in self._active_workers:
+ status += "\n " + dw.status_str()
+ if self._finished_workers:
+ status += "\n Finished threads:"
+ for dw in self._finished_workers:
+ status += "\n " + dw.status_str()
+ return status