# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """The hill genetic algorithm. Part of the Chrome build flags optimization. """ __author__ = 'yuhenglong@google.com (Yuheng Long)' import random import flags from flags import Flag from flags import FlagSet from generation import Generation from task import Task def CrossoverWith(first_flag, second_flag): """Get a crossed over gene. At present, this just picks either/or of these values. However, it could be implemented as an integer maskover effort, if required. Args: first_flag: The first gene (Flag) to cross over with. second_flag: The second gene (Flag) to cross over with. Returns: A Flag that can be considered appropriately randomly blended between the first and second input flag. """ return first_flag if random.randint(0, 1) else second_flag def RandomMutate(specs, flag_set, mutation_rate): """Randomly mutate the content of a task. Args: specs: A list of spec from which the flag set is created. flag_set: The current flag set being mutated mutation_rate: What fraction of genes to mutate. Returns: A Genetic Task constructed by randomly mutating the input flag set. """ results_flags = [] for spec in specs: # Randomly choose whether this flag should be mutated. if random.randint(0, int(1 / mutation_rate)): continue # If the flag is not already in the flag set, it is added. if spec not in flag_set: results_flags.append(Flag(spec)) continue # If the flag is already in the flag set, it is mutated. numeric_flag_match = flags.Search(spec) # The value of a numeric flag will be changed, and a boolean flag will be # dropped. if not numeric_flag_match: continue value = flag_set[spec].GetValue() # Randomly select a nearby value of the current value of the flag. rand_arr = [value] if value + 1 < int(numeric_flag_match.group('end')): rand_arr.append(value + 1) rand_arr.append(value - 1) value = random.sample(rand_arr, 1)[0] # If the value is smaller than the start of the spec, this flag will be # dropped. if value != int(numeric_flag_match.group('start')) - 1: results_flags.append(Flag(spec, value)) return GATask(FlagSet(results_flags)) class GATask(Task): def __init__(self, flag_set): Task.__init__(self, flag_set) def ReproduceWith(self, other, specs, mutation_rate): """Reproduce with other FlagSet. Args: other: A FlagSet to reproduce with. specs: A list of spec from which the flag set is created. mutation_rate: one in mutation_rate flags will be mutated (replaced by a random version of the same flag, instead of one from either of the parents). Set to 0 to disable mutation. Returns: A GA task made by mixing self with other. """ # Get the flag dictionary. father_flags = self.GetFlags().GetFlags() mother_flags = other.GetFlags().GetFlags() # Flags that are common in both parents and flags that belong to only one # parent. self_flags = [] other_flags = [] common_flags = [] # Find out flags that are common to both parent and flags that belong soly # to one parent. for self_flag in father_flags: if self_flag in mother_flags: common_flags.append(self_flag) else: self_flags.append(self_flag) for other_flag in mother_flags: if other_flag not in father_flags: other_flags.append(other_flag) # Randomly select flags that belong to only one parent. output_flags = [father_flags[f] for f in self_flags if random.randint(0, 1)] others = [mother_flags[f] for f in other_flags if random.randint(0, 1)] output_flags.extend(others) # Turn on flags that belong to both parent. Randomly choose the value of the # flag from either parent. for flag in common_flags: output_flags.append(CrossoverWith(father_flags[flag], mother_flags[flag])) # Mutate flags if mutation_rate: return RandomMutate(specs, FlagSet(output_flags), mutation_rate) return GATask(FlagSet(output_flags)) class GAGeneration(Generation): """The Genetic Algorithm.""" # The value checks whether the algorithm has converged and arrives at a fixed # point. If STOP_THRESHOLD of generations have not seen any performance # improvement, the Genetic Algorithm stops. STOP_THRESHOLD = None # Number of tasks in each generation. NUM_CHROMOSOMES = None # The value checks whether the algorithm has converged and arrives at a fixed # point. If NUM_TRIALS of trials have been attempted to generate a new task # without a success, the Genetic Algorithm stops. NUM_TRIALS = None # The flags that can be used to generate new tasks. SPECS = None # What fraction of genes to mutate. MUTATION_RATE = 0 @staticmethod def InitMetaData(stop_threshold, num_chromosomes, num_trials, specs, mutation_rate): """Set up the meta data for the Genetic Algorithm. Args: stop_threshold: The number of generations, upon which no performance has seen, the Genetic Algorithm stops. num_chromosomes: Number of tasks in each generation. num_trials: The number of trials, upon which new task has been tried to generated without success, the Genetic Algorithm stops. specs: The flags that can be used to generate new tasks. mutation_rate: What fraction of genes to mutate. """ GAGeneration.STOP_THRESHOLD = stop_threshold GAGeneration.NUM_CHROMOSOMES = num_chromosomes GAGeneration.NUM_TRIALS = num_trials GAGeneration.SPECS = specs GAGeneration.MUTATION_RATE = mutation_rate def __init__(self, tasks, parents, total_stucks): """Set up the meta data for the Genetic Algorithm. Args: tasks: A set of tasks to be run. parents: A set of tasks from which this new generation is produced. This set also contains the best tasks generated so far. total_stucks: The number of generations that have not seen improvement. The Genetic Algorithm will stop once the total_stucks equals to NUM_TRIALS defined in the GAGeneration class. """ Generation.__init__(self, tasks, parents) self._total_stucks = total_stucks def IsImproved(self): """True if this generation has improvement upon its parent generation.""" tasks = self.Pool() parents = self.CandidatePool() # The first generate does not have parents. if not parents: return True # Found out whether a task has improvement upon the best task in the # parent generation. best_parent = sorted(parents, key=lambda task: task.GetTestResult())[0] best_current = sorted(tasks, key=lambda task: task.GetTestResult())[0] # At least one task has improvement. if best_current.IsImproved(best_parent): self._total_stucks = 0 return True # If STOP_THRESHOLD of generations have no improvement, the algorithm stops. if self._total_stucks >= GAGeneration.STOP_THRESHOLD: return False self._total_stucks += 1 return True def Next(self, cache): """Calculate the next generation. Generate a new generation from the a set of tasks. This set contains the best set seen so far and the tasks executed in the parent generation. Args: cache: A set of tasks that have been generated before. Returns: A set of new generations. """ target_len = GAGeneration.NUM_CHROMOSOMES specs = GAGeneration.SPECS mutation_rate = GAGeneration.MUTATION_RATE # Collect a set of size target_len of tasks. This set will be used to # produce a new generation of tasks. gen_tasks = [task for task in self.Pool()] parents = self.CandidatePool() if parents: gen_tasks.extend(parents) # A set of tasks that are the best. This set will be used as the parent # generation to produce the next generation. sort_func = lambda task: task.GetTestResult() retained_tasks = sorted(gen_tasks, key=sort_func)[:target_len] child_pool = set() for father in retained_tasks: num_trials = 0 # Try num_trials times to produce a new child. while num_trials < GAGeneration.NUM_TRIALS: # Randomly select another parent. mother = random.choice(retained_tasks) # Cross over. child = mother.ReproduceWith(father, specs, mutation_rate) if child not in child_pool and child not in cache: child_pool.add(child) break else: num_trials += 1 num_trials = 0 while len(child_pool) < target_len and num_trials < GAGeneration.NUM_TRIALS: for keep_task in retained_tasks: # Mutation. child = RandomMutate(specs, keep_task.GetFlags(), mutation_rate) if child not in child_pool and child not in cache: child_pool.add(child) if len(child_pool) >= target_len: break else: num_trials += 1 # If NUM_TRIALS of tries have been attempted without generating a set of new # tasks, the algorithm stops. if num_trials >= GAGeneration.NUM_TRIALS: return [] assert len(child_pool) == target_len return [GAGeneration(child_pool, set(retained_tasks), self._total_stucks)]