Source code for alogos._optimization.ea.algorithm

"""Data structure for a evolutionary algorithm that uses G3P systems."""

import random as _random
from numbers import Number as _Number

import pylru as _pylru

from ... import Grammar as _Grammar
from ... import _utilities
from ... import exceptions as _exceptions
from ... import systems as _systems
from ... import warnings as _warnings
from ..._utilities import argument_processing as _ap
from ..._utilities.parametrization import ParameterCollection as _ParameterCollection
from ..shared import evaluation as _evaluation
from . import database as _database
from . import operators as _operators
from . import parameters as _parameters
from . import reporting as _reporting
from . import state as _state


[docs]class EvolutionaryAlgorithm: """Evolutionary algorithm for searching an optimal string in a grammar's language. The role of this class is explained by first introducing basic concepts from optimization theory and the field of grammar-guided genetic programming. Then the usage of this evolutionary algorithm is explained. 1. Concepts from optimization theory - Optimization: In general terms, optimization means to search for the best item out of many alternatives, or in other words, an optimal solution out of many candidate solutions. - Search space: The set of all candidate solutions is called the search space. It can have a finite or infinite number of elements and there can be mathematical relationships between them, which can be used to efficiently sample the space. - Search goal: Having a best or optimal item in the space requires that there is an order on the items. Such an order can be established in different ways, but the most common one is to define an objective function, which takes an item as input and returns a number as output. This number represents a quality score or fitness of the item and puts a weak order on all items, which means that some of them can be equally good and therefore multiple optimal items may exist that share the best quality score. - Search method: There are many optimization algorithms that work very efficiently on specific search spaces with certain mathematical structure. Methods that are more general and can act on a large variety of search spaces are sometimes called metaheuristics. They are usually stochastic optimization algorithms, which means they have a random element to them and that implies that running them several times can produce different results, i.e. find candidate solutions of different quality. They don't come with any guarantee to find a good or optimal solution, but often can successfully do so in hard optimization problems and are therefore often used in practice when the search space can not be tackled with exact methods. An evolutionary algorithm is a prominent example of a population-based metaheuristic, which was originally inspired by the process of natural evolution. Genetic programming is one subdiscipline in the field of evolutionary computation that deals with optimization in spaces of programs. Grammar-guided genetic programming (G3P) is one flavor of it that allows to define the search space with a context-free grammar, which is very often used in computer science to define various kinds of formal languages, including modern programming languages like Python or Rust. 2. Concepts from grammar-guided genetic programming - A grammar can be used to define a formal language. - A formal language is a finite or infinite set of string. - A search algorithm can try to find the best item in a set according to a given search goal. - An objective function can define a search goal by assigning a score to each item in a set, so that the item with minimum or maximum score are considered the best ones and need to be found. This class provides a metaheuristic search method known as evolutionary algorithm, which was tailored for use with different grammar-guided genetic programming systems such as Grammatical Evolution (GE) and Context-Free Grammar Genetic Programming (CFG-GP). It uses following concepts that are directly relevant for the usage of this class: - Individual: An individual represents a single candidate solution and comes with a genotype, phenotype and fitness. The genotype can be modified with variation operators to give rise to new closely related genotypes and thereby move through the search space. It also allows to derive a phenotype, which is an element of the search space that in the case of grammar-guided genetic programming is simply a string of the grammar's language. The fitness is a number that indicates the quality of a candidate solution and is calculated by a user-provided objective function. - Population: A population is a collection of individuals. - Generation: A generation is one of several consecutive populations used throughout a search with an evolutionary algorithm. The search begins with the creation of a random initial population and proceeds by generating one population after the other, each based on the previous one. The idea is that the average fitness of the individuals in the generations can be increased by using variation operators to generate new, closely related individuals together with selection operators that introduce a preference for individuals with higher fitness. This is done by 1. Parent selection to choose individuals that are allowed to create offspring. 2. Crossover to create new individuals by mixing the genotypes of parent individuals. 3. Mutation to introduce slight random variations into the offspring individuals. 4. Survivor selection to choose individuals that are allowed to continue into the next generation. All of these steps can be performed by different operators and this class provides several well-known ones to modify how the search is performed. """ __slots__ = ("database", "parameters", "state", "_report") # Initialization and reset
[docs] def __init__( self, grammar, objective_function, objective, system="cfggpst", evaluator=None, **kwargs ): """Create and parameterize an evolutionary algorithm. Parameters ---------- grammar : `~alogos.Grammar` Context-free grammar that defines the search space. A grammar specifies a formal language, which is a finite or infinite set of strings. Each item of the search space is simply a string of the grammar's language. The aim is to find an optimal string according to a user-provided objective. objective_function : `~collections.abc.Callable` A function that gets an item of the search space as input and needs to return a numerical value for it as output. In other words, it gets a phenotype and returns a fitness value for it that represents how good it is. Given that the search space is a grammar's language, each candidate solution or phenotype is a string. Therefore a very simple objective function could just measure the length of each string and return that as fitness value:: def obj_fun(string): fitness = len(string) return fitness More realistic objective functions will usually evaluate the string in some way and measure how well it performs on a task. objective : `str` Goal of the optimization, which is to find a candidate solution that has either the minimum or maximum value assigned to it by the objective function. Possible values: - ``"min"``: Minimization, i.e. looking for a candidate solution with the smallest possible fitness value. - ``"max"``: Maximization, i.e. looking for a candidate solution with the highest possible fitness value. system : `str`, optional Grammar-guided gentic programming system used by the evolutionary algorithm. Possible values: - ``"cfggp"``: CFG-GP, see `~alogos.systems.cfggp`. - ``"cfggpst"``: CFG-GP-ST, see `~alogos.systems.cfggpst`. - ``"dsge"``: DSGE, see `~alogos.systems.dsge`. - ``"ge"``: GE, see `~alogos.systems.ge`. - ``"pige"``: piGE, see `~alogos.systems.pige`. - ``"whge"``: WHGE, see `~alogos.systems.whge`. The system provides following components to the search algorithm: - Genotype representation: This is a system-specific, indirect representation of a candidate solution, e.g. a list of int, a bitarray, or a tree. - Genotype-phenotype mapping: This is used to translate a genotype to a phenotype. Note that a genotype is system-specific, while a phenotype is a string of the grammar's language. - Random variation operators: These are used to move through the search space by generating new genotypes that are close or related to the original ones. This is achieved by randomly introducing slight changes into a genotype (mutation) or recombining multiple genotypes into new ones (crossover). evaluator : `~collections.abc.Callable`, optional A function that gets a function together with a list of items as input and needs to return a list of new items as output. The task of this function is to evaluate the given function for each item in the list, and collect the return values in a new list. All function calls are assumed to be independent and may be computed serially, in parallel or in a distributed fashion. **kwargs : `dict`, optional Further keyword arguments are forwarded either to the evolutionary algorithm or to the chosen `system`. This allows to control the search process in great detail. Every parameter can also be changed later on via the attribute `parameters` of the generated instance. This can be done before starting a run with the `run` method, but also throughout a run when it is performed one step at a time by repeatedly calling the `step` method. For a description of available keyword arguments, see - Parameters for the evolutionary algorithm itself: `~alogos._optimization.ea.parameters.default_parameters` - Parameters of the chosen G3P system: - CFG-GP: `~alogos.systems.cfggp.default_parameters` - CFG-GP-ST: `~alogos.systems.cfggpst.default_parameters` - DSGE: `~alogos.systems.dsge.default_parameters` - GE: `~alogos.systems.ge.default_parameters` - piGE: `~alogos.systems.pige.default_parameters` - WHGE: `~alogos.systems.whge.default_parameters` For example, when the chosen system is GE, the creation and parametrization of the evolutionary algorithm could look like this:: import alogos as al grammar = al.Grammar("<S> ::= 1 | 22 | 333") def obj_fun(string): return len(string) ea = al.EvolutionaryAlgorithm( grammar, obj_fun, "max", system="ge", max_generations=10, max_wraps=2) ea.run() Note that in this example ``max_generations`` is a parameter of the evolutionary algorithm itself and therefore it can be used no matter which system is chosen. In contrast, ``max_wraps`` is a parameter specific to the system GE and would not be accepted if for example the system CFG-GP were chosen by passing ``system="cfggp"`` instead of ``system="ge"``. Raises ------ ParameterError If a parameter is provided that is not known to the evolutionary algorithm or to the chosen system. A list of available parameters and their default values will be printed. """ # Argument processing _ap.check_arg("grammar", grammar, types=(_Grammar,)) _ap.callable_arg("objective_function", objective_function) objective = _ap.str_arg( "objective", objective, vals=("min", "max"), to_lower=True ) system = _ap.str_arg( "system", system, vals=("cfggp", "cfggpst", "dsge", "ge", "pige", "whge"), to_lower=True, ) system = getattr(_systems, system) if evaluator is None: evaluator = _evaluation.default_evaluator else: _ap.callable_arg("evaluator", evaluator) # Parameter processing self.parameters = self._process_parameters( grammar, objective_function, objective, system, evaluator, kwargs ) # State and database (re)initialization self.reset() # Reporting if isinstance(self.parameters.verbose, bool): if self.parameters.verbose: self._report = _reporting.MinimalReporter() elif isinstance(self.parameters.verbose, _Number): if self.parameters.verbose == 1: self._report = _reporting.MinimalReporter() elif self.parameters.verbose > 1: self._report = _reporting.VerboseReporter()
def _process_parameters( self, grammar, objective_function, objective, system, evaluator, kwargs ): """Try to assign keyword arguments to predefined parameters. Raises ------ ParameterError If a keyword argument is passed whose key is not a known parameter name. """ # Dictionary of algorithm parameters and system parameters with all default values params = dict( grammar=grammar, objective_function=objective_function, objective=objective, system=system, evaluator=evaluator, ) params.update(_parameters.default_parameters) params.update(system.default_parameters) # Overwrite default values with user-provided kwargs, raise error if a name is unknown params = _ParameterCollection(params) for key, val in kwargs.items(): if key in params: params[key] = val else: _exceptions.raise_unknown_parameter_error(key, params) # Convert values if isinstance(params["verbose"], bool): params["verbose"] = int(params["verbose"]) # Check types _ap.int_arg("population_size", params["population_size"], min_incl=1) _ap.int_arg("offspring_size", params["offspring_size"], min_incl=1) _ap.int_arg("verbose", params["verbose"], min_incl=0) _ap.check_arg("database_on", params["database_on"], types=(bool,)) _ap.check_arg("database_location", params["database_location"], types=(str,)) return params # Representations def __repr__(self): """Compute the "official" string representation of the algorithm. References ---------- - https://docs.python.org/3/reference/datamodel.html#object.__repr__ """ return "<EvolutionaryAlgorithm object at {}>".format(hex(id(self))) def __str__(self): """Compute the "informal" string representation of the algorithm. References ---------- - https://docs.python.org/3/reference/datamodel.html#object.__str__ """ return str(self.state) def _repr_pretty_(self, p, cycle): """Provide rich display representation for IPython and Jupyter. References ---------- - https://ipython.readthedocs.io/en/stable/config/integrating.html#rich-display """ if cycle: p.text(repr(self)) else: p.text(str(self)) # Optimization algorithm
[docs] def step(self): """Perform a single step of the evolutionary search. Taking a single step means that one generation is added, which can happen in one of two ways: - If the run has just started, the first generation is created by initializating a population. - If the run is already ongoing, a new generation is derived from the current generation by a process that involves 1) parent selection that prefers fitter individuals, 2) generating an offspring population by applying random variation operators (mutation, crossover) to the parents and 3) survivor selection that again prefers fitter individuals. Two important notes about the behavior of this method: - It contrast to the `run` method, it does not check if any stop criterion has been met! For example, when the number of generations reaches the value provided by the ``max_generations`` parameter, calling this method will regardless add another generation. - Since this method has to be called repeatedly to perform the individual steps of a run, it enables to change the parameters of the evolutionary algorithm on the fly by modifying the `parameters` attribute between calls. This means the search can be adjusted in much greater detail than with the `run` method. For example, the mutation and crossover rates can be gradually modified during a run to reduce the amount of variation when getting closer to the goal. It is also possible to change the population size or even the objective function. Fixed throughout a run, however, are the grammar (=search space) and the G3P system (=internal representation). Returns ------- best_individual : `~alogos.systems._shared.representation.Individual` of the chosen G3P system The individual with the best fitness found so far in the entire search. This means the individual may have been discovered not in the current step but in an earlier one. """ # Start time self._store_start_time() # Check parameters self._check_parameter_consistency() if self.state.population is None: # CREATE first generation pop = self._initialize_population() # Evaluate self._evaluate_population(pop) # Store and report self._finish_generation(survivor_population=pop) else: # SELECT parents parent_population = self._select_parents(self.state.population) # CREATE offspring by random variation crossed_over_population = self._cross_over(parent_population) mutated_population = self._mutate(crossed_over_population) # Evaluate self._evaluate_population(mutated_population) # SELECT survivors survivor_population = self._select_survivors( self.state.population, mutated_population ) # Store and report self._finish_generation( parent_population, crossed_over_population, mutated_population, survivor_population, ) # Stop time self._store_stop_time() return self.state.best_individual
[docs] def run(self): """Perform one search step after another until a stop criterion is met. A user can set a single or multiple stop criteria. The algorithm will halt when one of them is fulfilled. Caution: If the criteria are set in such a way that they can never be realized, e.g. because an unreachable fitness threshold was chosen, then the run will not halt until it is interrupted from the outside. Note that after a search has halted, it is possible to change the stop criteria and call `run` again to continue the search. This can also be done multiple times. Returns ------- best_individual : `~alogos.systems._shared.representation.Individual` of the chosen G3P system The individual with the best fitness found so far in the entire search. Raises ------ ParameterError If no stop criterion was set. This prevents starting a run that clearly would not halt. Warns ----- OptimizationWarning If not a single step was taken, because some stop criterion was already fulfilled by the current search state. """ pr = self.parameters # Check if a stop criterion was set if ( pr.max_generations is None and pr.max_fitness_evaluations is None and pr.max_runtime_in_seconds is None and pr.max_or_min_fitness is None ): _exceptions.raise_stop_parameter_error() # Run step after step until a stop criterion is met gen_before = self.state.generation while not self.is_stop_criterion_met(): self.step() gen_after = self.state.generation # Warn if not a single step was performed in this call if gen_after == gen_before: _warnings._warn_no_step_in_ea_performed(self.state.generation) # Report if pr.verbose: self._report.run_end(self.state) return self.state.best_individual
[docs] def is_stop_criterion_met(self): """Check if the current search state meets any stop criterion. This method is used internally by the `run` method. It can also be useful when constructing a specialized search that repeatedly calls the `step` method, where it is the responsibility of the user to ensure a stop. Returns ------- stop : bool ``True`` if a stop criterion is met, ``False`` otherwise. """ pr = self.parameters st = self.state # Maximum number of generations if pr.max_generations is not None: if st.num_generations >= pr.max_generations: return True # Maximum number of fitness evaluations if pr.max_fitness_evaluations is not None: if st.num_phe_to_fit_evaluations >= pr.max_fitness_evaluations: return True # Maximum runtime if pr.max_runtime_in_seconds is not None: if st.runtime >= pr.max_runtime_in_seconds: return True # Fitness threshold if pr.max_or_min_fitness is not None: if st.best_individual is not None: if pr.objective == "min": if st.best_individual.fitness <= pr.max_or_min_fitness: return True elif pr.objective == "max": if st.best_individual.fitness >= pr.max_or_min_fitness: return True # No stop criterion is met return False
[docs] def reset(self): """Reset the current search state to enable a fresh start. Not all aspects of the algorithm object are simply set back to their initial state: - Parameters are not reset to their default values. If any non-default parameters were passed during object-creation or if they were set to user-provided values later via the `parameters` attribute, then these values are kept and not overwritten by default values. The purpose of this behavior is to enable repeated runs that are directly comparable. - If a database was used, it is reset, but what that means depends on its location: - If the database location is in memory, the content is entirely lost during a reset. - If the database location is a file, it is renamed during a reset and a new file is created. This means the content of a run is not lost but relocated. - Cached values are fully deleted. This means previous phenotype-to-fitness evaluations will not be reused in a follow-up run. """ # State: storage of main data generated during a search in a single object self.state = _state.State() # Cache: optional storage of least recently used (lru) mapping results for lookup if self.parameters.gen_to_phe_cache_lookup_on: self.state._gen_to_phe_cache = _pylru.lrucache( self.parameters.gen_to_phe_cache_size ) else: self.state._gen_to_phe_cache = None if self.parameters.phe_to_fit_cache_lookup_on: self.state._phe_to_fit_cache = _pylru.lrucache( self.parameters.phe_to_fit_cache_size ) else: self.state._phe_to_fit_cache = None # Database: optional storage of all data generated during a search in an SQL database if self.parameters.database_on: self.database = _database.Database( self.parameters.database_location, self.parameters.system ) else: self.database = None
# 1) Beginning def _initialize_population(self): """Create the initial population. There are different ways this can be done. Which one is used is decided by the default or user-provided parameters. """ pr = self.parameters st = self.state # Report start of initialization if pr.verbose: self._report.init_start( st.generation, st.start_time, _utilities.times.current_time_readable() ) # Generate initial population operator = self._get_operator( "Population initialization", pr.init_pop_operator, pr.system.init_population, ) pr["init_pop_size"] = pr["population_size"] initial_population = operator(pr.grammar, pr) # Set individual ids self._assign_new_ids(initial_population) # Set state st.population = initial_population # Report end of initialization if pr.verbose: self._report.init_end(len(initial_population)) return initial_population # 2) Evaluation def _evaluate_population(self, population): """Calculate mappings for each individual. 1) Genotype-to-phenotype mapping: A system-specific genotype is translated into a system-independent phenotype. 2) Phenotype-to-fitness mapping: A phenotype, which is a string of the grammar's language, gets a fitness assigned to it by evaluating the user-provided objective function on it. In realistic problems this can be a costly operation, therefore it makes sense to store the result in a limited cache or unlimited database and reuse it instead of performing the calculation again. The behavior is chosen by the default or user-provided parameters. """ pr = self.parameters # Genotype-to-phenotype mapping if pr.verbose: self._report.map_gen_phe() self._calculate_phenotypes(population) # Phenotype-to-fitness mapping if pr.verbose: self._report.map_phe_fit() self._calculate_fitnesses(population) def _calculate_phenotypes(self, population): """Perform the genotype-to_phenotype mapping. Possible crashes are prevented by wrapping the function such that default values are returned in case of an error or invalid return value. """ pr = self.parameters st = self.state cache = st._gen_to_phe_cache cache_on = pr.gen_to_phe_cache_lookup_on # Lookup or calculate phenotype to fitness & details mapping gen_phe_map = dict() # 1) Determine unique genotypes unique_genotypes = {ind.genotype for ind in population} remaining_genotypes = unique_genotypes.copy() n_unique = len(unique_genotypes) # 2) For each unique genotype, look up if it is available in cache if cache_on: for gen in unique_genotypes: try: gen_phe_map[gen] = cache[gen] remaining_genotypes.remove(gen) except KeyError: pass n_after_cache = len(remaining_genotypes) # 3) For each remaining genotype, calculate the mapping if remaining_genotypes: # Evaluate forward = pr.system.mapping.forward for gen in remaining_genotypes: try: phe = forward(pr.grammar, gen) except _exceptions.MappingError: phe = None gen_phe_map[gen] = phe # Update state st.num_gen_to_phe_evaluations += len(remaining_genotypes) # Assign values from lookup or calculation for ind in population: ind.phenotype = gen_phe_map[ind.genotype] # Update cache if cache_on: for gen in unique_genotypes: cache[gen] = gen_phe_map[gen] # Report if pr.verbose: self._report.calc_phe( len(population), n_unique, n_unique - n_after_cache, n_after_cache ) def _calculate_fitnesses(self, population): """Perform the phenotype-to-fitness mapping. Possible crashes are prevented by wrapping the function such that default values are returned in case of an error or invalid return value. """ pr = self.parameters st = self.state cache = st._phe_to_fit_cache cache_on = pr.phe_to_fit_cache_lookup_on db_on = pr.phe_to_fit_database_lookup_on and pr.database_on # Lookup or calculate phenotype to fitness & details mapping phe_fit_map = dict() # 1) Determine unique phenotypes unique_phenotypes = [] # list instead set to preserve evaluation order for ind in population: phe = ind.phenotype if phe not in unique_phenotypes: unique_phenotypes.append(phe) remaining_phenotypes = [] n_unique = len(unique_phenotypes) # 2) For each unique phenotype, look up if it is available in cache if cache_on: for phe in unique_phenotypes: try: phe_fit_map[phe] = cache[phe] except KeyError: remaining_phenotypes.append(phe) else: remaining_phenotypes = unique_phenotypes n_after_cache = len(remaining_phenotypes) # 3) For each remaining phenotype, look up if it is available in database # because evaluation of the objective function could be very costly if db_on and remaining_phenotypes: for phe, fit_det in self.database._lookup_phenotype_evaluations( remaining_phenotypes ): phe_fit_map[phe] = fit_det remaining_phenotypes.remove(phe) n_after_db = len(remaining_phenotypes) # 4) For each remaining phenotype, calculate the objective function if remaining_phenotypes: # Evaluate func = _evaluation.get_robust_fitness_function( pr.objective_function, pr.objective ) results = pr.evaluator(func, remaining_phenotypes) for phe, fit_det in zip(remaining_phenotypes, results): phe_fit_map[phe] = fit_det # Update state st.num_phe_to_fit_evaluations += len(remaining_phenotypes) # Assign values from lookup or calculation for ind in population: ind.fitness, ind.details["evaluation"] = phe_fit_map[ind.phenotype] # Update cache if cache_on: for phe in unique_phenotypes: cache[phe] = phe_fit_map[phe] # Report if pr.verbose: self._report.calc_fit( len(population), n_unique, n_unique - n_after_cache, n_after_cache - n_after_db, n_after_db, ) def _assign_new_ids(self, population): """Assign a unique id to each individual created in a search.""" st = self.state for ind in population: ind.details["id"] = st.num_individuals st.num_individuals += 1 # 3) Selection def _select_parents(self, current_population): """Select parent individuals from given population. There are different ways this can be done. Which one is used is decided by the default or user-provided parameters. The population of selected parents contains copies of the original individuals, so they have their own ids and can be tracked for result analysis. """ pr = self.parameters # Select parent individuals operator = self._get_operator( "Parent selection", pr.parent_selection_operator, _operators.selection ) selected_individuals = operator( individuals=current_population.individuals, sample_size=pr.offspring_size, objective=pr.objective, parameters=pr, state=self.state, ) # Create parent population new_individuals = [] for ind in selected_individuals: new_ind = ind.copy() new_ind.details["parent_ids"] = [ind.details["id"]] new_individuals.append(new_ind) parent_population = pr.system.representation.Population(new_individuals) self._assign_new_ids(parent_population) # Report if pr.verbose: self._report.select_par( self.state.generation, _utilities.times.current_time_readable(), len(current_population), len(parent_population), ) return parent_population def _select_survivors(self, old_population, offspring_population): """Select survivor individuals from two given populations. There are different ways this can be done. Which one is used is decided by the default or user-provided parameters. The population of selected survivors contains copies of the original individuals, so they have their own ids and can be tracked for result analysis. """ pr = self.parameters # Create a pool from which survivor individuals will be selected operator = self._get_operator( "Generation model", pr.generation_model, _operators.generation_model, ) pool = operator(old_population, offspring_population, pr) # Select survivor individuals operator = self._get_operator( "Survivor selection", pr.survivor_selection_operator, _operators.selection ) selected_individuals = operator( individuals=pool, sample_size=pr.population_size, objective=pr.objective, parameters=pr, state=self.state, ) # Create survivor population new_individuals = [] for ind in selected_individuals: new_ind = ind.copy() new_ind.details["parent_ids"] = [ind.details["id"]] new_individuals.append(new_ind) survivor_population = pr.system.representation.Population(new_individuals) self._assign_new_ids(survivor_population) # Report if pr.verbose: self._report.select_sur( len(old_population), len(offspring_population), len(survivor_population) ) return survivor_population # 4) Variation def _check_parameter_consistency(self): """Check if the current parameters are consistent. This is necessary because the initial parametrization has to be checked, but also because a user can change it between calls to the step method. An example of an inconsistent parametrization is when all variation operators are turned off, because in that case no new individuals are generated and therefore the search can not progress. """ pr = self.parameters if pr.crossover_operator is None and pr.mutation_operator is None: _exceptions.raise_missing_variation_error() def _cross_over(self, parent_population): """Perform crossover events between with given individuals. There are different ways this can be done. Which one is used is decided by the default or user-provided parameters. """ pr = self.parameters st = self.state # Optional skip if pr.crossover_operator is None: return parent_population # Cross-over operator = self._get_operator( "Crossover", pr.crossover_operator, pr.system.crossover ) crossed_over_individuals = [] n = len(parent_population) while len(crossed_over_individuals) < n: if n > 2: parent0, parent1 = _random.sample(parent_population.individuals, 2) elif n == 2: parent0, parent1 = parent_population.individuals else: parent0 = parent1 = parent_population.individuals[0] child0_gen, child1_gen = operator( pr.grammar, parent0.genotype.copy(), parent1.genotype.copy(), pr ) child0 = pr.system.representation.Individual( genotype=child0_gen, details=dict( id=st.num_individuals, parent_ids=[parent0.details["id"], parent1.details["id"]], ), ) st.num_individuals += 1 child1 = pr.system.representation.Individual( genotype=child1_gen, details=dict( id=st.num_individuals, parent_ids=[parent0.details["id"], parent1.details["id"]], ), ) st.num_individuals += 1 crossed_over_individuals.append(child0) crossed_over_individuals.append(child1) crossed_over_population = pr.system.representation.Population( crossed_over_individuals ) # Report if pr.verbose: self._report.cross_over( len(parent_population), len(crossed_over_population) ) return crossed_over_population def _mutate(self, crossed_over_population): """Perform mutation events on given individuals. There are different ways this can be done. Which one is used is decided by the default or user-provided parameters. """ pr = self.parameters st = self.state # Optional skip if pr.mutation_operator is None: return crossed_over_population # Mutation operator = self._get_operator( "Mutation", pr.mutation_operator, pr.system.mutation ) mutated_individuals = [] for ind in crossed_over_population: new_gen = operator(pr.grammar, ind.genotype, pr) new_ind = pr.system.representation.Individual( genotype=new_gen, details=dict( id=st.num_individuals, parent_ids=[ind.details["id"]], ), ) st.num_individuals += 1 mutated_individuals.append(new_ind) mutated_population = pr.system.representation.Population(mutated_individuals) # Report if pr.verbose: self._report.mutate(len(crossed_over_population), len(mutated_population)) return mutated_population # 5) End def _finish_generation( self, parent_population=None, crossed_over_population=None, mutated_population=None, survivor_population=None, ): """Process and optionally store the given individuals.""" pr = self.parameters st = self.state # Optional: Storage to database if pr.database_on: db = self.database if parent_population: db._store_population( "parent_selection", parent_population, st.generation ) if crossed_over_population and pr.crossover_operator is not None: db._store_population( "crossover", crossed_over_population, st.generation ) if mutated_population and pr.mutation_operator is not None: db._store_population("mutation", mutated_population, st.generation) if survivor_population: db._store_population("main", survivor_population, st.generation) # Update state if ( crossed_over_population is not None and pr.crossover_operator is not None and pr.mutation_operator is None ): # Crossover population has only been evaluated if it was not followed by mutation self._store_best_and_worst_ind(crossed_over_population) if mutated_population is not None and pr.mutation_operator is not None: self._store_best_and_worst_ind(mutated_population) self._store_best_and_worst_ind(survivor_population) st.population = survivor_population st.generation += 1 st.num_generations += 1 # Report if pr.verbose: self._report.gen_end(self.state) # State management def _store_start_time(self): """Add the start time of the current step to the list of all start times.""" self.state.start_timestamps.append(_utilities.times.current_time_unix()) def _store_stop_time(self): """Add the stop time of the current step to the list of all stop times.""" self.state.stop_timestamps.append(_utilities.times.current_time_unix()) def _store_best_and_worst_ind(self, pop): """Remember individuals with min/max/best fitness in an entire run. Note that depending on the search strategy, it is possible that these individuals get lost throughout a run, i.e. they are no longer part of the latest generation. It is important, however, that these few special individuals are remembered, so the best result can reliably be returned at the end of the run. """ obj = self.parameters.objective st = self.state # Set min and max individual, not depending on obective if st.min_individual is None and pop: st.min_individual = pop[0] if st.max_individual is None and pop: st.max_individual = pop[0] for ind in pop: if ind.less_than(st.min_individual, obj): st.min_individual = ind if ind.greater_than(st.max_individual, obj): st.max_individual = ind # Set best individual, depending on obective if obj == "min": st.best_individual = st.min_individual else: st.best_individual = st.max_individual @staticmethod def _get_operator(description, name, location): """Fetch an operator function by its name.""" try: return getattr(location, name) except Exception: _exceptions.raise_operator_lookup_error(description, name, location)