aboutsummaryrefslogtreecommitdiff
path: root/crosperf/experiment.py
blob: 59e932f61cd3b058fe46cfd71fe7f7102458b69e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#!/usr/bin/python

# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""The experiment setting module."""

import os
import time

import afe_lock_machine
from threading import Lock

from utils import logger
from utils import misc

from benchmark_run import BenchmarkRun
from machine_manager import MachineManager
from machine_manager import MockMachineManager
import test_flag


class Experiment(object):
  """Class representing an Experiment to be run."""

  def __init__(self, name, remote, working_directory,
               chromeos_root, cache_conditions, labels, benchmarks,
               experiment_file, email_to, acquire_timeout, log_dir,
               log_level, share_cache, results_directory, locks_directory):
    self.name = name
    self.working_directory = working_directory
    self.remote = remote
    self.chromeos_root = chromeos_root
    self.cache_conditions = cache_conditions
    self.experiment_file = experiment_file
    self.email_to = email_to
    if not results_directory:
      self.results_directory = os.path.join(self.working_directory,
                                            self.name + "_results")
    else:
      self.results_directory = misc.CanonicalizePath(results_directory)
    self.log_dir = log_dir
    self.log_level = log_level
    self.labels = labels
    self.benchmarks = benchmarks
    self.num_complete = 0
    self.num_run_complete = 0
    self.share_cache = share_cache
    # If locks_directory (self.lock_dir) not blank, we will use the file
    # locking mechanism; if it is blank then we will use the AFE server
    # locking mechanism.
    self.locks_dir = locks_directory
    self.locked_machines = []

    # We need one chromeos_root to run the benchmarks in, but it doesn't
    # matter where it is, unless the ABIs are different.
    if not chromeos_root:
      for label in self.labels:
        if label.chromeos_root:
          chromeos_root = label.chromeos_root
    if not chromeos_root:
      raise Exception("No chromeos_root given and could not determine one from "
                      "the image path.")

    if test_flag.GetTestMode():
      self.machine_manager = MockMachineManager(chromeos_root, acquire_timeout,
                                                log_level, locks_directory)
    else:
      self.machine_manager = MachineManager(chromeos_root, acquire_timeout,
                                            log_level, locks_directory)
    self.l = logger.GetLogger(log_dir)

    for machine in remote:
      self.machine_manager.AddMachine(machine)
    for label in labels:
      self.machine_manager.ComputeCommonCheckSum(label)
      self.machine_manager.ComputeCommonCheckSumString(label)

    self.start_time = None
    self.benchmark_runs = self._GenerateBenchmarkRuns()

    self._schedv2 = None
    self._internal_counter_lock = Lock()

  def set_schedv2(self, schedv2):
      self._schedv2 = schedv2

  def schedv2(self):
      return self._schedv2

  def _GenerateBenchmarkRuns(self):
    """Generate benchmark runs from labels and benchmark defintions."""
    benchmark_runs = []
    for label in self.labels:
      for benchmark in self.benchmarks:
        for iteration in range(1, benchmark.iterations + 1):

          benchmark_run_name = "%s: %s (%s)" % (label.name, benchmark.name,
                                                iteration)
          full_name = "%s_%s_%s" % (label.name, benchmark.name, iteration)
          logger_to_use = logger.Logger(self.log_dir,
                                        "run.%s" % (full_name),
                                        True)
          benchmark_run = BenchmarkRun(benchmark_run_name,
                                       benchmark,
                                       label,
                                       iteration,
                                       self.cache_conditions,
                                       self.machine_manager,
                                       logger_to_use,
                                       self.log_level,
                                       self.share_cache)

          benchmark_runs.append(benchmark_run)
    return benchmark_runs

  def Build(self):
    pass

  def Terminate(self):
    if self._schedv2 is not None:
      self._schedv2.terminate()
    else:
      for t in self.benchmark_runs:
        if t.isAlive():
          self.l.LogError("Terminating run: '%s'." % t.name)
          t.Terminate()

  def IsComplete(self):
    if self._schedv2:
      return self._schedv2.is_complete()
    if self.active_threads:
      for t in self.active_threads:
        if t.isAlive():
          t.join(0)
        if not t.isAlive():
          self.num_complete += 1
          if not t.cache_hit:
            self.num_run_complete += 1
          self.active_threads.remove(t)
      return False
    return True

  def BenchmarkRunFinished(self, br):
      """Update internal counters after br finishes.

      Note this is only used by schedv2 and is called by multiple threads.
      Never throw any exception here.
      """

      assert self._schedv2 is not None
      with self._internal_counter_lock:
          self.num_complete += 1
          if not br.cache_hit:
            self.num_run_complete += 1

  def Run(self):
    self.start_time = time.time()
    if self._schedv2 is not None:
      self._schedv2.run_sched()
    else:
      self.active_threads = []
      for benchmark_run in self.benchmark_runs:
        # Set threads to daemon so program exits when ctrl-c is pressed.
        benchmark_run.daemon = True
        benchmark_run.start()
        self.active_threads.append(benchmark_run)

  def SetCacheConditions(self, cache_conditions):
    for benchmark_run in self.benchmark_runs:
      benchmark_run.SetCacheConditions(cache_conditions)

  def Cleanup(self):
    """Make sure all machines are unlocked."""
    if self.locks_dir:
      # We are using the file locks mechanism, so call machine_manager.Cleanup
      # to unlock everything.
      self.machine_manager.Cleanup()
    else:
      all_machines = self.locked_machines
      if not all_machines:
        return

      # If we locked any machines earlier, make sure we unlock them now.
      lock_mgr = afe_lock_machine.AFELockManager(all_machines, "",
                                                 self.labels[0].chromeos_root,
                                                 None)
      machine_states = lock_mgr.GetMachineStates("unlock")
      for k, state in machine_states.iteritems():
        if state["locked"]:
          lock_mgr.UpdateLockInAFE(False, k)