aboutsummaryrefslogtreecommitdiff
path: root/bestflags/genetic_algorithm.py
blob: deb83f124a3c2fdcf1aef405fa2844d9056c0e94 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The hill genetic algorithm.

Part of the Chrome build flags optimization.
"""

__author__ = 'yuhenglong@google.com (Yuheng Long)'

import random

import flags
from flags import Flag
from flags import FlagSet
from generation import Generation
from task import Task


def CrossoverWith(first_flag, second_flag):
  """Get a crossed over gene.

  At present, this just picks either/or of these values.  However, it could be
  implemented as an integer maskover effort, if required.

  Args:
    first_flag: The first gene (Flag) to cross over with.
    second_flag: The second gene (Flag) to cross over with.

  Returns:
    A Flag that can be considered appropriately randomly blended between the
    first and second input flag.
  """

  return first_flag if random.randint(0, 1) else second_flag


def RandomMutate(specs, flag_set, mutation_rate):
  """Randomly mutate the content of a task.

  Args:
    specs: A list of spec from which the flag set is created.
    flag_set: The current flag set being mutated
    mutation_rate: What fraction of genes to mutate.

  Returns:
    A Genetic Task constructed by randomly mutating the input flag set.
  """

  results_flags = []

  for spec in specs:
    # Randomly choose whether this flag should be mutated.
    if random.randint(0, int(1 / mutation_rate)):
      continue

    # If the flag is not already in the flag set, it is added.
    if spec not in flag_set:
      results_flags.append(Flag(spec))
      continue

    # If the flag is already in the flag set, it is mutated.
    numeric_flag_match = flags.Search(spec)

    # The value of a numeric flag will be changed, and a boolean flag will be
    # dropped.
    if not numeric_flag_match:
      continue

    value = flag_set[spec].GetValue()

    # Randomly select a nearby value of the current value of the flag.
    rand_arr = [value]
    if value + 1 < int(numeric_flag_match.group('end')):
      rand_arr.append(value + 1)

    rand_arr.append(value - 1)
    value = random.sample(rand_arr, 1)[0]

    # If the value is smaller than the start of the spec, this flag will be
    # dropped.
    if value != int(numeric_flag_match.group('start')) - 1:
      results_flags.append(Flag(spec, value))

  return GATask(FlagSet(results_flags))


class GATask(Task):

  def __init__(self, flag_set):
    Task.__init__(self, flag_set)

  def ReproduceWith(self, other, specs, mutation_rate):
    """Reproduce with other FlagSet.

    Args:
      other: A FlagSet to reproduce with.
      specs: A list of spec from which the flag set is created.
      mutation_rate: one in mutation_rate flags will be mutated (replaced by a
        random version of the same flag, instead of one from either of the
        parents).  Set to 0 to disable mutation.

    Returns:
      A GA task made by mixing self with other.
    """

    # Get the flag dictionary.
    father_flags = self.GetFlags().GetFlags()
    mother_flags = other.GetFlags().GetFlags()

    # Flags that are common in both parents and flags that belong to only one
    # parent.
    self_flags = []
    other_flags = []
    common_flags = []

    # Find out flags that are common to both parent and flags that belong soly
    # to one parent.
    for self_flag in father_flags:
      if self_flag in mother_flags:
        common_flags.append(self_flag)
      else:
        self_flags.append(self_flag)

    for other_flag in mother_flags:
      if other_flag not in father_flags:
        other_flags.append(other_flag)

    # Randomly select flags that belong to only one parent.
    output_flags = [father_flags[f] for f in self_flags if random.randint(0, 1)]
    others = [mother_flags[f] for f in other_flags if random.randint(0, 1)]
    output_flags.extend(others)
    # Turn on flags that belong to both parent. Randomly choose the value of the
    # flag from either parent.
    for flag in common_flags:
      output_flags.append(CrossoverWith(father_flags[flag], mother_flags[flag]))

    # Mutate flags
    if mutation_rate:
      return RandomMutate(specs, FlagSet(output_flags), mutation_rate)

    return GATask(FlagSet(output_flags))


class GAGeneration(Generation):
  """The Genetic Algorithm."""

  # The value checks whether the algorithm has converged and arrives at a fixed
  # point. If STOP_THRESHOLD of generations have not seen any performance
  # improvement, the Genetic Algorithm stops.
  STOP_THRESHOLD = None

  # Number of tasks in each generation.
  NUM_CHROMOSOMES = None

  # The value checks whether the algorithm has converged and arrives at a fixed
  # point. If NUM_TRIALS of trials have been attempted to generate a new task
  # without a success, the Genetic Algorithm stops.
  NUM_TRIALS = None

  # The flags that can be used to generate new tasks.
  SPECS = None

  # What fraction of genes to mutate.
  MUTATION_RATE = 0

  @staticmethod
  def InitMetaData(stop_threshold, num_chromosomes, num_trials, specs,
                   mutation_rate):
    """Set up the meta data for the Genetic Algorithm.

    Args:
      stop_threshold: The number of generations, upon which no performance has
        seen, the Genetic Algorithm stops.
      num_chromosomes: Number of tasks in each generation.
      num_trials: The number of trials, upon which new task has been tried to
        generated without success, the Genetic Algorithm stops.
      specs: The flags that can be used to generate new tasks.
      mutation_rate: What fraction of genes to mutate.
    """

    GAGeneration.STOP_THRESHOLD = stop_threshold
    GAGeneration.NUM_CHROMOSOMES = num_chromosomes
    GAGeneration.NUM_TRIALS = num_trials
    GAGeneration.SPECS = specs
    GAGeneration.MUTATION_RATE = mutation_rate

  def __init__(self, tasks, parents, total_stucks):
    """Set up the meta data for the Genetic Algorithm.

    Args:
      tasks: A set of tasks to be run.
      parents: A set of tasks from which this new generation is produced. This
        set also contains the best tasks generated so far.
      total_stucks: The number of generations that have not seen improvement.
        The Genetic Algorithm will stop once the total_stucks equals to
        NUM_TRIALS defined in the GAGeneration class.
    """

    Generation.__init__(self, tasks, parents)
    self._total_stucks = total_stucks

  def IsImproved(self):
    """True if this generation has improvement upon its parent generation."""

    tasks = self.Pool()
    parents = self.CandidatePool()

    # The first generate does not have parents.
    if not parents:
      return True

    # Found out whether a task has improvement upon the best task in the
    # parent generation.
    best_parent = sorted(parents, key=lambda task: task.GetTestResult())[0]
    best_current = sorted(tasks, key=lambda task: task.GetTestResult())[0]

    # At least one task has improvement.
    if best_current.IsImproved(best_parent):
      self._total_stucks = 0
      return True

    # If STOP_THRESHOLD of generations have no improvement, the algorithm stops.
    if self._total_stucks >= GAGeneration.STOP_THRESHOLD:
      return False

    self._total_stucks += 1
    return True

  def Next(self, cache):
    """Calculate the next generation.

    Generate a new generation from the a set of tasks. This set contains the
      best set seen so far and the tasks executed in the parent generation.

    Args:
      cache: A set of tasks that have been generated before.

    Returns:
      A set of new generations.
    """

    target_len = GAGeneration.NUM_CHROMOSOMES
    specs = GAGeneration.SPECS
    mutation_rate = GAGeneration.MUTATION_RATE

    # Collect a set of size target_len of tasks. This set will be used to
    # produce a new generation of tasks.
    gen_tasks = [task for task in self.Pool()]

    parents = self.CandidatePool()
    if parents:
      gen_tasks.extend(parents)

    # A set of tasks that are the best. This set will be used as the parent
    # generation to produce the next generation.
    sort_func = lambda task: task.GetTestResult()
    retained_tasks = sorted(gen_tasks, key=sort_func)[:target_len]

    child_pool = set()
    for father in retained_tasks:
      num_trials = 0
      # Try num_trials times to produce a new child.
      while num_trials < GAGeneration.NUM_TRIALS:
        # Randomly select another parent.
        mother = random.choice(retained_tasks)
        # Cross over.
        child = mother.ReproduceWith(father, specs, mutation_rate)
        if child not in child_pool and child not in cache:
          child_pool.add(child)
          break
        else:
          num_trials += 1

    num_trials = 0

    while len(child_pool) < target_len and num_trials < GAGeneration.NUM_TRIALS:
      for keep_task in retained_tasks:
        # Mutation.
        child = RandomMutate(specs, keep_task.GetFlags(), mutation_rate)
        if child not in child_pool and child not in cache:
          child_pool.add(child)
          if len(child_pool) >= target_len:
            break
        else:
          num_trials += 1

    # If NUM_TRIALS of tries have been attempted without generating a set of new
    # tasks, the algorithm stops.
    if num_trials >= GAGeneration.NUM_TRIALS:
      return []

    assert len(child_pool) == target_len

    return [GAGeneration(child_pool, set(retained_tasks), self._total_stucks)]