import copy as _copy
import json as _json
import os as _os
import sqlite3 as _sqlite3
from collections.abc import Iterable as _Iterable
from ... import _utilities
from ... import exceptions as _exceptions
from ... import warnings as _warnings
from ..._utilities import argument_processing as _ap
from ..._utilities.database_management import Sqlite3Wrapper as _Sqlite3Wrapper
from ..._utilities.operating_system import NEWLINE as _NEWLINE
from . import database as _database
from . import plots as _plots
[docs]class Database:
"""Database wrapper for easily storing algorithm results."""
__slots__ = ("_location", "_system", "_dbms", "_cache", "_deserializer")
[docs] def __init__(self, location, system=None):
"""Create a database object referring to a file-based or in-memory SQLite3 database."""
self._location = location
# Cache: Store results of repeated calculations, but recalculate after database changes
self._cache = dict()
# Create or connect to SQLite3 database, try to create tables and views
self._dbms = _Sqlite3Wrapper(self._location)
self._try_creating_tables()
self._try_creating_views()
# Deserializer: Convert database entries back to Python objects
if system is None:
system = "cfggpst"
self._deserializer = Deserializer(system)
# Representations
def __repr__(self):
"""Compute the "official" string representation of the database wrapper."""
return "<EvolutionaryAlgorithmDatabase object at {}>".format(hex(id(self)))
def __str__(self):
"""Compute the "informal" string representation of the database wrapper."""
max_shown_phenotypes = 5
# Read data
num_changes = self._dbms.get_num_changes()
num_generations = self.num_generations()
num_individuals = self.num_individuals()
num_genotypes = self.num_genotypes()
num_phenotypes = self.num_phenotypes()
num_fitness = self.num_fitnesses()
try:
pop_size_min = self.population_size_min()
pop_size_max = self.population_size_max()
except _exceptions.DatabaseError:
pop_size_min = "not available"
pop_size_max = "not available"
min_individuals = self.individuals_with_min_fitness()
min_genotypes = self.genotypes_with_min_fitness()
min_phenotypes = self.phenotypes_with_min_fitness()
num_min_ind = len(min_individuals)
num_min_genotypes = len(min_genotypes)
num_min_phenotypes = len(min_phenotypes)
try:
min_fitness = self.fitness_min()
max_fitness = self.fitness_max()
except _exceptions.DatabaseError:
min_fitness = "not available"
max_fitness = "not available"
max_individuals = self.individuals_with_max_fitness()
max_genotypes = self.genotypes_with_max_fitness()
max_phenotypes = self.phenotypes_with_max_fitness()
num_max_ind = len(max_individuals)
num_max_genotypes = len(max_genotypes)
num_max_phenotypes = len(max_phenotypes)
# Write message
msg = []
msg.append("╭─ Database of the evolutionary search{}".format(_NEWLINE))
msg.append(
"│ Number of changes ............ {}{}".format(num_changes, _NEWLINE)
)
msg.append(
"│ Number of generations ........ {}{}".format(num_generations, _NEWLINE)
)
msg.append(
"│ Number of individuals ........ {}{}".format(num_individuals, _NEWLINE)
)
msg.append(
"│ Number of unique genotypes ... {}{}".format(num_genotypes, _NEWLINE)
)
msg.append(
"│ Number of unique phenotypes .. {}{}".format(num_phenotypes, _NEWLINE)
)
msg.append(
"│ Number of unique fitnesses ... {}{}".format(num_fitness, _NEWLINE)
)
msg.append(
"│ Minimum population size ...... {}{}".format(pop_size_min, _NEWLINE)
)
msg.append(
"│ Maximum population size ...... {}{}".format(pop_size_max, _NEWLINE)
)
if isinstance(min_fitness, float):
msg.append(
"│ Minimum fitness .............. {:.6f}{}".format(
min_fitness, _NEWLINE
)
)
msg.append(
"│ shared by {} individuals, {} genotypes, {} phenotypes{}".format(
num_min_ind, num_min_genotypes, num_min_phenotypes, _NEWLINE
)
)
msg.append(
"│ Maximum fitness .............. {:.6f}{}".format(
max_fitness, _NEWLINE
)
)
msg.append(
"│ shared by {} individuals, {} genotypes, {} phenotypes{}".format(
num_max_ind, num_max_genotypes, num_max_phenotypes, _NEWLINE
)
)
else:
msg.append(
"│ Minimum fitness .............. {}{}".format(min_fitness, _NEWLINE)
)
msg.append(
"│ Maximum fitness .............. {}{}".format(max_fitness, _NEWLINE)
)
# First individual with min fitness
if min_individuals:
msg.append("│{}".format(_NEWLINE))
first_min_individual = min_individuals[0]
msg.append(
"│ First individual with minimum fitness {:.6f}{}".format(
min_fitness, _NEWLINE
)
)
for line in str(first_min_individual).splitlines():
msg.append("│ {}{}".format(line, _NEWLINE))
# All phenotypes with min fitness
if num_min_phenotypes > 1:
msg.append("│{}".format(_NEWLINE))
if num_min_phenotypes <= max_shown_phenotypes:
num_shown_phenotypes = num_min_phenotypes
else:
num_shown_phenotypes = max_shown_phenotypes
msg.append(
"│ First {} of {} phenotypes with minimum fitness {:.6f}{}".format(
num_shown_phenotypes, num_min_phenotypes, min_fitness, _NEWLINE
)
)
for phe in min_phenotypes[:max_shown_phenotypes]:
msg.append("│ Phenotype: {}{}".format(phe, _NEWLINE))
if num_min_phenotypes > max_shown_phenotypes:
msg.append("│ ...{}".format(_NEWLINE))
# First individual with max fitness
if max_individuals:
msg.append("│{}".format(_NEWLINE))
first_max_individual = max_individuals[0]
msg.append(
"│ First individual with maximum fitness {:.6f}{}".format(
max_fitness, _NEWLINE
)
)
for line in str(first_max_individual).splitlines():
msg.append("│ {}{}".format(line, _NEWLINE))
# All phenotypes with max fitness
if num_max_phenotypes > 1:
msg.append("│{}".format(_NEWLINE))
if num_max_phenotypes <= max_shown_phenotypes:
num_shown_phenotypes = num_max_phenotypes
else:
num_shown_phenotypes = max_shown_phenotypes
msg.append(
"│ First {} of {} phenotypes with maximum fitness {:.6f}{}".format(
num_shown_phenotypes, num_max_phenotypes, max_fitness, _NEWLINE
)
)
for phe in max_phenotypes[:max_shown_phenotypes]:
msg.append("│ Phenotype: {}{}".format(phe, _NEWLINE))
if num_max_phenotypes > max_shown_phenotypes:
msg.append("│ ...{}".format(_NEWLINE))
msg.append("╰─")
text = "".join(msg)
return text
def _repr_pretty_(self, p, cycle):
"""Provide rich display representation for IPython and Jupyter."""
if cycle:
p.text(repr(self))
else:
p.text(str(self))
# Creation of tables and views
def _try_creating_tables(self):
"""Try to create tables in the database."""
try:
# Table 1: search - contains the main data produced by a search
query = (
"CREATE TABLE search ( "
" individual_id INTEGER PRIMARY KEY, "
" parent_ids TEXT, "
" generation INTEGER, "
" label TEXT, "
" genotype TEXT, "
" FOREIGN KEY(genotype) REFERENCES genotype_phenotype_mapping(genotype)"
");"
)
self._dbms.execute_query(query)
# Table 2: genotype_phenotype_mapping - contains repetitive mapping data
query = (
"CREATE TABLE genotype_phenotype_mapping ( "
" genotype TEXT PRIMARY KEY, "
" phenotype TEXT, "
" FOREIGN KEY(phenotype) REFERENCES phenotype_fitness_mapping(phenotype)"
");"
)
self._dbms.execute_query(query)
# Table 3: phenotype_fitness_mapping - contains repetitive mapping data
query = (
"CREATE TABLE phenotype_fitness_mapping ( "
" phenotype TEXT PRIMARY KEY, "
" fitness REAL,"
" details TEXT "
");"
)
self._dbms.execute_query(query)
except _sqlite3.OperationalError:
# If tables already exist, ignore the error on trying to create them again
pass
def _try_creating_views(self):
"""Try to create views in the database.
These views join some tables for simpler access to their data.
"""
try:
# View 1: full_search - combines search data with repetitive mapping data
query = (
"CREATE VIEW full_search AS "
"SELECT "
" search.individual_id, "
" search.parent_ids, "
" search.generation, "
" search.label, "
" search.genotype, "
" genotype_phenotype_mapping.phenotype, "
" phenotype_fitness_mapping.fitness, "
" phenotype_fitness_mapping.details "
"FROM search "
"LEFT JOIN genotype_phenotype_mapping "
" ON genotype_phenotype_mapping.genotype=search.genotype "
"LEFT JOIN phenotype_fitness_mapping "
" ON phenotype_fitness_mapping.phenotype=genotype_phenotype_mapping.phenotype;"
)
self._dbms.execute_query(query)
except _sqlite3.OperationalError:
# If view already exists, ignore the error on trying to create it again
pass
# Storing database entries during a search, loading them during analysis
def _store_population(self, label, population, generation):
"""Store all individuals of a population in the database.
Notes
-----
- Each individual requires multiple INSERT statements. If they
were commited as separate transactions, it would take a lot of
time. Instead, the data of all individuals can be collected
and stored in a single transaction.
- The details attribute of each individual can contain an
evaluation key that refers to user-defined data returned by
the objective function. It is attempted to be JSON serialized
and in case it fails the data of this attribute is not stored
and an empty dictionary instead without warning.
References
----------
- https://www.sqlite.org/faq.html#q19
- https://docs.python.org/3/library/sqlite3.html#using-sqlite3-efficiently
- https://stackoverflow.com/questions/603572/escape-single-quote-character-for-use-in-an-sqlite-query
"""
# Note: Serialization is performed inline (not in several methods) to increase run speed,
# while deserialization after a run is performed in methods to increase modularity
# Argument processing
generation = str(generation)
label = str(label)
# Prepare queries
query1 = "INSERT OR IGNORE INTO phenotype_fitness_mapping VALUES (?, ?, ?);"
query2 = "INSERT OR IGNORE INTO genotype_phenotype_mapping VALUES (?, ?);"
query3 = "INSERT INTO search VALUES (?, ?, ?, ?, ?);"
# Prepare data
data1 = []
data2 = []
data3 = []
phe_known = set()
gt_known = set()
for ind in population:
# Serialization: genotype to str in system-specific format
gt = str(ind.genotype)
# Serialization: parent_ids to str in list format
try:
parent_ids = str(ind.details["parent_ids"])
except KeyError:
parent_ids = "[]"
# Data for table "search"
data3.append((ind.details["id"], str(parent_ids), generation, label, gt))
# Data for other tables
phe = ind.phenotype
if phe is not None:
if phe not in phe_known:
phe_known.add(phe)
# Serialization: details to None, JSON or str
details = ind.details["evaluation"]
if details is not None:
try:
details = _json.dumps(details)
except Exception:
details = str(details)
# Data for table "phenotype_fitness_mapping"
data1.append((phe, ind.fitness, details))
if gt not in gt_known:
# Data for table "genotype_phenotype_mapping"
gt_known.add(gt)
data2.append((gt, phe))
# Insert data
self._dbms.execute_query_for_many_records(query1, data1)
self._dbms.execute_query_for_many_records(query2, data2)
self._dbms.execute_query_for_many_records(query3, data3)
def _load_population(self, generation, with_parent_ids=True):
"""Load a population identified by its generation from the database.
The information in from the database is converted to suitable
Python objects on the level of individual attributes
(e.g. fitness is float), individual objects (type depends on
system) and population (type depends on system).
Raises
------
DatabaseError
If the loaded population is empty because the database does
not contain the user-provided generation.
"""
# Load database entries
query = 'SELECT * FROM full_search WHERE generation=? AND label="main";'
rows = self._dbms.execute_query(query, (generation,))
# Raise error if empty
if not rows:
_exceptions.raise_load_population_error(generation)
# Reconstruct population
return self._deserializer.population(rows)
def _store_database_subset(self, data):
"""Store a subset of data to the database."""
# Split data
data_search = data["search"]
data_gen_phe = data["genotype_phenotype_mapping"]
data_phe_fit = data["phenotype_fitness_mapping"]
# Table 1: search
query = "INSERT INTO search VALUES (?, ?, ?, ?, ?);"
try:
self._dbms.execute_query_for_many_records(query, data_search)
except _sqlite3.IntegrityError:
_exceptions.raise_individual_clash_error()
# Table 2: genotype_phenotype_mapping
query = "INSERT OR IGNORE INTO genotype_phenotype_mapping VALUES (?, ?);"
self._dbms.execute_query_for_many_records(query, data_gen_phe)
# Table 3: phenotype_fitness_mapping
query = "INSERT OR IGNORE INTO phenotype_fitness_mapping VALUES (?, ?, ?);"
self._dbms.execute_query_for_many_records(query, data_phe_fit)
def _load_database_subset(self, first_gen, last_gen):
"""Load a subset of data defined by first and last generation from the database."""
# Table 1: search
query = "SELECT * FROM search WHERE generation>=? AND generation<=?;"
search = self._dbms.execute_query(query, (first_gen, last_gen))
# Table 2: genotype_phenotype_mapping
query = (
"WITH chosen_genotypes AS ("
" SELECT DISTINCT genotype FROM search "
" WHERE generation>=? AND generation<=? "
") "
"SELECT * FROM genotype_phenotype_mapping WHERE genotype IN chosen_genotypes"
)
genotype_phenotype_mapping = self._dbms.execute_query(
query, (first_gen, last_gen)
)
# Table 3: phenotype_fitness_mapping
query = (
"WITH chosen_genotypes AS ("
" SELECT DISTINCT genotype FROM search "
" WHERE generation>=? AND generation<=? "
"), chosen_phenotypes AS ("
" SELECT DISTINCT phenotype FROM genotype_phenotype_mapping "
" WHERE genotype IN chosen_genotypes "
") "
"SELECT * FROM phenotype_fitness_mapping WHERE phenotype IN chosen_phenotypes"
)
phenotype_fitness_mapping = self._dbms.execute_query(
query, (first_gen, last_gen)
)
# Combine the results in a dictionary
data = dict(
search=search,
genotype_phenotype_mapping=genotype_phenotype_mapping,
phenotype_fitness_mapping=phenotype_fitness_mapping,
)
return data
def _load_database_full(self):
"""Load all data from the database."""
return self._load_database_subset(
self.generation_first(), self.generation_last()
)
# File I/O
[docs] def export_sql(self, filepath, ext="sqlite3"):
"""Export the evolutionary search by storing the current database to an SQL file.
All tables and views are exported to a single SQL file that
adheres to
`SQLite version 3 <https://docs.python.org/3/library/sqlite3.html>`__.
It can be used for a later import in order to continue and
analyze a run, or it may be opened with an external tools such
as `DB Browser for SQLite <https://sqlitebrowser.org/>`__.
Parameters
----------
filepath : str
The given filepath may automatically be modified in two
ways:
- It is ensured to end with the extension defined by the
``ext`` argument.
- It is ensured to be a filepath that does not exist yet by
adding a numerical suffix.
Example: If ``some_file.sqlite3`` exists, it uses
``some_file_1.sqlite3`` or if that also exists then
``some_file_2.sqlite3`` and so on.
ext : str
The extension that the filepath is ensured to end with.
- If ``None``, no extension is added.
- If ``db``, the filepath is ensured to end with ``.db``.
- If ``.sql``, the filepath is ensured to end with ``.sql``.
Returns
-------
filepath_used : str
"""
# Argument processing
if ext is None:
filepath_used = filepath
else:
filepath_used = _utilities.operating_system.ensure_file_extension(
filepath, ext
)
filepath_used = _utilities.operating_system.ensure_new_path(filepath_used)
# Export
self._dbms.export_sql(filepath_used)
return filepath_used
[docs] def export_csv(self, filepath, ext="csv"):
"""Export the database of this evolutionary search as CSV file.
Only the main view, which gathers information from all
individual tables, is exported to a single CSV file. As such
it provides all information stored in the database in a
redundant manner. Currently it can not be used for a later
import, but it can be opened with external tools that can read
CSV files, such as
`LibreOffice Calc <https://www.libreoffice.org/discover/calc/>`__
or
`Tad <https://www.tadviewer.com>`__.
Parameters
----------
filepath : str
The given filepath may automatically be modified in two ways:
- It is ensured to end with the extension defined by the
``ext`` argument.
- It is ensured to be a filepath that does not exist yet by
adding a numerical suffix.
Example: If ``some_file.sqlite3`` exists, it uses
``some_file_1.sqlite3`` or if that also exists then
``some_file_2.sqlite3`` and so on.
ext : str
The extension that the filepath is ensured to end with.
- If it is ``None``, no extension is added.
- If it is ``csv``, the filepath is ensured to end with
``.csv``.
- If it is ``.csv``, the filepath is ensured to end with
``.csv``.
Returns
-------
filepath_used : str
"""
# Argument processing
if ext is None:
filepath_used = filepath
else:
filepath_used = _utilities.operating_system.ensure_file_extension(
filepath, ext
)
filepath_used = _utilities.operating_system.ensure_new_path(filepath_used)
# Export
self._dbms.export_csv(filepath, name="full_search")
return filepath_used
[docs] def import_sql(self, filepath, generation_range=None):
"""Import an evolutionary search by loading the SQL file of a previous run.
Either all generations (default) or only a subset defined by the
interval ``[first_generation, last_generation]`` can be loaded.
Caution: The method :meth:`reset` is called, so that the current
state and database are dropped and can be replaced in a clean
fashion by new data from the SQL file.
Parameters
----------
filepath : str
Filepath of an SQLite3 file exported by a previous run.
generation_range : `tuple` of two `int`, optional
The first and last generation to include in the import.
Examples
--------
Using ``first_generation=0`` and ``last_generation=2`` loads the
first three generations of a previous run. The last generation
is reconstructed as population in memory, so that the search can
be continued from this point. If :meth:`run_step()` is called,
the last generation loaded from the database (2) is used to
construct the next generation (3). The resulting search state
can again be exported to an SQL file if desired.
"""
# Argument processing
first_gen, last_gen = self._process_generation_range(generation_range)
filepath = _ap.str_arg("filepath", filepath)
if not _os.path.isfile(filepath):
_exceptions.raise_import_database_error(filepath)
# Reset
self.reset()
# Databases
source_db = _database.Database(filepath)
if self.parameters.database_on:
target_db = self.database
# Argument processing (with information from source database)
first_gen_in_db = source_db.generation_first()
last_gen_in_db = source_db.generation_last()
if first_gen < first_gen_in_db or first_gen > last_gen_in_db:
_warnings._warn_database_import_first(first_gen_in_db, first_gen)
first_generation = first_gen_in_db
if last_gen > last_gen_in_db or last_gen < first_gen_in_db:
_warnings._warn_database_import_last(last_gen_in_db, last_gen)
last_generation = last_gen_in_db
# Load subset from old db and store it into new db
if self.parameters.database_on:
data = source_db._load_database_subset(first_generation, last_generation)
if all(len(entries) == 0 for entries in data.values()):
message = (
'Tried to load the chosen data from SQL database "{}" but the '
"resulting list was empty.".format(filepath)
)
raise ValueError(message)
target_db._store_database_subset(data)
# Reconstruct properties to determine current state
if (last_generation - first_generation) > 1:
with_parent_ids = True
else:
with_parent_ids = False
last_population = source_db._load_population(
self.parameters.system, last_generation, with_parent_ids
)
if len(last_population) == 0:
message = (
'Tried to load the chosen data from SQL database "{}" but the '
"resulting list was empty.".format(filepath)
)
raise ValueError(message)
min_inds = source_db.individuals_with_min_fit(
self.parameters.system, first_generation, last_generation
)
min_ind = None if not min_inds else min_inds[0]
max_inds = source_db.individuals_with_max_fit(
self.parameters.system, first_generation, last_generation
)
max_ind = None if not max_inds else max_inds[0]
max_id = source_db._individual_max_id([first_generation, last_generation])
if not max_id:
max_id = 0
# Set current state, partly based on database content, partly fresh
self.state.population = last_population
self.state.generation = last_generation + 1
self.state.num_generations = 0
self.state.num_individuals = max_id + 1
self.state.num_gen_to_phe_evaluations = 0
self.state.num_phe_to_fit_evaluations = 0
if True:
self.state.best_individual = min_ind
else:
self.state.best_individual = max_ind
self.state.min_individual = min_ind
self.state.max_individual = max_ind
[docs] def import_sql_evaluations(self, filepath, verbose=False):
"""Import only phenotype-to-fitness evaluation data from an SQL file.
This method allows to load phenotype-to-fitness calculations
from a previous run. It is relevant when the objective funtion
is computationally demanding and prevention of some
recalculations may speed up the search significantly.
Parameters
----------
filepath : str
Filepath of an SQLite3 file exported by a previous run.
"""
# Argument processing
filepath = _ap.str_arg("filepath", filepath)
if not _os.path.isfile(filepath):
_exceptions.raise_import_database_error(filepath)
# Preparation of source and target database
source_db = Database(filepath, system="")
target_db = self
# Load data from source database, warn if it is empty
query = "SELECT * FROM phenotype_fitness_mapping;"
phe_fit_evaluations = source_db._dbms.execute_query(query)
if len(phe_fit_evaluations) == 0:
_warnings._warn_import_database_empty(filepath)
# Store data in target database
query = "INSERT OR IGNORE INTO phenotype_fitness_mapping VALUES (?, ?, ?);"
for row in phe_fit_evaluations:
target_db._dbms.execute_query(query, row)
# Optional report
if verbose:
num_eval = len(phe_fit_evaluations)
message = (
"Loaded {} phenotype-to-fitness evaluations from "
"external database at {}."
).format(num_eval, filepath)
print(message)
# Getting insights into stored information
# - Counts
[docs] def num_generations(self):
"""Get the number of generations stored in the database."""
# Query
query = "SELECT MAX(generation)-MIN(generation)+1 FROM search;"
result = self._dbms.execute_query(query)
# Check
value = result[0][0]
if value is None:
value = 0
return value
[docs] def num_individuals(self, generation_range=None, only_main=False):
"""Get the number of individuals stored in the database."""
# Note: individual_id is the primary key, hence counting does not require DISTINCT
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = (
"SELECT COUNT(individual_id) FROM search " "WHERE label={};"
).format(label)
result = self._dbms.execute_query(query)
else:
query = (
"SELECT COUNT(individual_id) FROM search "
"WHERE label={} AND generation BETWEEN ? AND ?;"
).format(label)
result = self._dbms.execute_query(query, generation_range)
return result[0][0]
[docs] def num_genotypes(self, generation_range=None, only_main=False):
"""Get the number of unique genotypes stored in the database."""
# Note: Null is not considered in 'SELECT COUNT(DISTINCT genotype)' but should never occur
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = (
"SELECT COUNT(DISTINCT genotype) FROM full_search " "WHERE label={};"
).format(label)
result = self._dbms.execute_query(query)
else:
query = (
"SELECT COUNT(DISTINCT genotype) FROM full_search "
"WHERE label={} AND generation BETWEEN ? AND ?;"
).format(label)
result = self._dbms.execute_query(query, generation_range)
return result[0][0]
[docs] def num_phenotypes(self, generation_range=None, only_main=False):
"""Get the number of unique phenotypes stored in the database."""
# Note: Null would not be considered in 'SELECT COUNT(DISTINCT phenotype)' but can occur
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = (
"SELECT COUNT(*) FROM ("
" SELECT DISTINCT phenotype FROM full_search"
" WHERE label={}"
");"
).format(label)
result = self._dbms.execute_query(query)
else:
query = (
"SELECT COUNT(*) FROM ("
" SELECT DISTINCT phenotype FROM full_search"
" WHERE label={} AND generation BETWEEN ? AND ?"
");"
).format(label)
result = self._dbms.execute_query(query, generation_range)
return result[0][0]
[docs] def num_fitnesses(self, generation_range=None, only_main=False):
"""Get the number of unique fitness values stored in the database.
NaN values are not counted. These values appear in individuals
that were not evaluated, e.g. those generated by crossover but
then modified by mutation before being evaluated and selected.
"""
# Note: Null (NaN) is not considered in 'SELECT COUNT(DISTINCT fitness)' but may occur
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = (
"SELECT COUNT(*) FROM ("
" SELECT DISTINCT fitness FROM full_search"
" WHERE label={}"
");"
).format(label)
result = self._dbms.execute_query(query)
else:
query = (
"SELECT COUNT(*) FROM ("
" SELECT DISTINCT fitness FROM full_search"
" WHERE label={} AND generation BETWEEN ? AND ?"
");"
).format(label)
result = self._dbms.execute_query(query, generation_range)
return result[0][0]
[docs] def num_details(self, generation_range=None, only_main=False):
"""Get the number of unique details stored in the database."""
# Note: Null would not be considered in 'SELECT COUNT(DISTINCT details)' but can occur
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = (
"SELECT COUNT(*) FROM ("
" SELECT DISTINCT details FROM full_search"
" WHERE label={}"
");"
).format(label)
result = self._dbms.execute_query(query)
else:
query = (
"SELECT COUNT(*) FROM ("
" SELECT DISTINCT details FROM full_search"
" WHERE label={} AND generation BETWEEN ? AND ?"
");"
).format(label)
result = self._dbms.execute_query(query, generation_range)
return result[0][0]
[docs] def num_gen_to_phe_evaluations(self):
"""Get the number of genotype-to-phenotype evaluations."""
query = "SELECT COUNT(*) FROM genotype_phenotype_mapping;"
result = self._dbms.execute_query(query)
return result[0][0]
[docs] def num_phe_to_fit_evaluations(self, only_unique=True):
"""Get the number of phenotype-to-fitness evaluations.
Note: It assumes that no phenotype was evaluated more than once,
which depends on the parametrization (cache and/or database
lookups).
"""
# Argument processing
column = "DISTINCT(t1.phenotype)" if only_unique else "t1.phenotype"
# Query
query = (
"SELECT COUNT({}) FROM genotype_phenotype_mapping AS t1 "
"LEFT JOIN phenotype_fitness_mapping AS t2 "
"ON t2.phenotype=t1.phenotype;"
).format(column)
result = self._dbms.execute_query(query)
return result[0][0]
# - Generation
[docs] def generation_first(self):
"""Get the first generation stored in the database.
Raises an error if the database does not contain any entries yet.
"""
# Query
query = "SELECT MIN(generation) FROM search;"
result = self._dbms.execute_query(query)
# Check
value = result[0][0]
if value is None:
_exceptions.raise_generation_first_error()
return value
[docs] def generation_last(self):
"""Get the last generation stored in the database.
Raises an error if the database does not contain any entries yet.
"""
# Query
query = "SELECT MAX(generation) FROM search;"
result = self._dbms.execute_query(query)
# Check
value = result[0][0]
if value is None:
_exceptions.raise_generation_last_error()
return value
def _process_generation_range(self, generation_range):
"""Process the user-provided generation range so it can be used safely in a query."""
_ap.check_arg(
"generation_range", generation_range, (type(None), int, _Iterable)
)
if isinstance(generation_range, int):
generation_range = (generation_range, generation_range)
elif isinstance(generation_range, _Iterable):
try:
assert not isinstance(generation_range, str)
first, last = generation_range
if first is None:
try:
first = self.generation_first()
except _exceptions.DatabaseError:
first = 0
if last is None:
try:
last = self.generation_last()
except _exceptions.DatabaseError:
last = 0
assert not isinstance(first, float)
assert not isinstance(last, float)
generation_range = (int(first), int(last))
except Exception:
_exceptions.raise_generation_range_error()
return generation_range
# - Individual
def _individual_max_id(self, generation_range=None, only_main=False):
"""Get the largest individual id stored in the database.
Returns None if the database does not contain any entries yet.
"""
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = ("SELECT MAX(individual_id) FROM search " "WHERE label={};").format(
label
)
result = self._dbms.execute_query(query)
else:
query = (
"SELECT MAX(individual_id) FROM search "
"WHERE label={} AND generation BETWEEN ? AND ?;"
).format(label)
result = self._dbms.execute_query(query, generation_range)
# Check
value = result[0][0]
if value is None:
_exceptions.raise_ind_max_id_error()
return value
[docs] def individuals(self, generation_range=None, only_main=False):
"""Get all individuals stored in the database."""
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = ("SELECT * FROM full_search " "WHERE label={};").format(label)
result = self._dbms.execute_query(query)
else:
query = (
"SELECT * FROM full_search "
"WHERE label={} AND generation BETWEEN ? AND ?;"
).format(label)
result = self._dbms.execute_query(query, generation_range)
return self._deserializer.individuals(result)
[docs] def individuals_with_given_fitness(
self, fitness, generation_range=None, only_main=False
):
"""Get all individuals that have the same user-provided fitness value."""
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = (
"SELECT * FROM full_search " "WHERE label={} AND fitness=?;"
).format(label)
result = self._dbms.execute_query(query, (fitness,))
else:
query = (
"SELECT * FROM full_search "
"WHERE label={} AND generation BETWEEN ? AND ? AND fitness=?;"
).format(label)
result = self._dbms.execute_query(query, (*generation_range, fitness))
return self._deserializer.individuals(result)
[docs] def individuals_with_min_fitness(self, generation_range=None, only_main=False):
"""Get all individuals that have the same minimum fitness value."""
try:
value = self.fitness_min(generation_range, only_main)
individuals = self.individuals_with_given_fitness(
value, generation_range, only_main
)
except _exceptions.DatabaseError:
individuals = []
return individuals
[docs] def individuals_with_max_fitness(self, generation_range=None, only_main=False):
"""Get all individuals that have the same maximum fitness value."""
try:
value = self.fitness_max(generation_range, only_main)
individuals = self.individuals_with_given_fitness(
value, generation_range, only_main
)
except _exceptions.DatabaseError:
individuals = []
return individuals
[docs] def individuals_with_low_fitness(
self, n=10, generation_range=None, only_main=False
):
"""Get the first n elements from a list of individuals sorted by lowest fitness."""
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = (
"SELECT * FROM full_search "
"WHERE label={} AND fitness IS NOT NULL "
"ORDER BY fitness ASC "
"LIMIT ?;"
).format(label)
result = self._dbms.execute_query(query, (n,))
else:
query = (
"SELECT * FROM full_search "
"WHERE label={} AND generation BETWEEN ? AND ? AND fitness IS NOT NULL "
"ORDER BY fitness ASC "
"LIMIT ?;"
).format(label)
result = self._dbms.execute_query(query, (*generation_range, n))
return self._deserializer.individuals(result)
[docs] def individuals_with_high_fitness(
self, n=10, generation_range=None, only_main=False
):
"""Load the first n individuals, when all of them are sorted by hightest fitness."""
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = (
"SELECT * FROM full_search "
"WHERE label={} AND fitness IS NOT NULL "
"ORDER BY fitness DESC "
"LIMIT ?;"
).format(label)
result = self._dbms.execute_query(query, (n,))
else:
query = (
"SELECT * FROM full_search "
"WHERE label={} AND generation BETWEEN ? AND ? AND fitness IS NOT NULL "
"ORDER BY fitness DESC "
"LIMIT ?;"
).format(label)
result = self._dbms.execute_query(query, (*generation_range, n))
return self._deserializer.individuals(result)
# - Population
[docs] def population_size_min(self):
"""Get smallest population size of any generation stored in the database.
Raises an error if the database does not contain any entries yet.
"""
# Query
query = (
"SELECT MIN(population_size) FROM ("
" SELECT COUNT(label) AS population_size FROM search "
' WHERE label="main" GROUP BY generation'
");"
)
result = self._dbms.execute_query(query)
# Check
value = result[0][0]
if value is None:
_exceptions.raise_pop_size_min_error()
return value
[docs] def population_size_max(self):
"""Get largest population size of any generation stored in the database.
Raises an error if the database does not contain any entries yet.
"""
# Query
query = (
"SELECT MAX(population_size) FROM ("
" SELECT COUNT(label) AS population_size FROM search "
' WHERE label="main" GROUP BY generation'
");"
)
result = self._dbms.execute_query(query)
# Check
value = result[0][0]
if value is None:
_exceptions.raise_pop_size_max_error()
return value
# - Genotype
[docs] def genotypes(self, generation_range=None, only_main=False):
"""Get unique genotypes stored in the database."""
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = ("SELECT DISTINCT genotype FROM search " "WHERE label={};").format(
label
)
result = self._dbms.execute_query(query)
else:
query = (
"SELECT DISTINCT genotype FROM search "
"WHERE label={} AND generation BETWEEN ? AND ?;"
).format(label)
result = self._dbms.execute_query(query, generation_range)
return self._deserializer.genotypes(result)
[docs] def genotypes_with_given_fitness(
self, fitness, generation_range=None, only_main=False
):
"""Get unique genotypes that have the same user-provided fitness value."""
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = (
"SELECT DISTINCT genotype FROM full_search "
"WHERE label={} AND fitness=?;"
).format(label)
result = self._dbms.execute_query(query, (fitness,))
else:
query = (
"SELECT DISTINCT genotype FROM full_search "
"WHERE label={} AND generation BETWEEN ? AND ? AND fitness=?;"
).format(label)
result = self._dbms.execute_query(query, (*generation_range, fitness))
return self._deserializer.genotypes(result)
[docs] def genotypes_with_min_fitness(self, generation_range=None, only_main=False):
"""Load unique genotypes that have the same minimum fitness value."""
try:
value = self.fitness_min(generation_range, only_main)
genotypes = self.genotypes_with_given_fitness(
value, generation_range, only_main
)
except _exceptions.DatabaseError:
genotypes = []
return genotypes
[docs] def genotypes_with_max_fitness(self, generation_range=None, only_main=False):
"""Load unique genotypes that have the same maximum fitness value."""
try:
value = self.fitness_max(generation_range, only_main)
genotypes = self.genotypes_with_given_fitness(
value, generation_range, only_main
)
except _exceptions.DatabaseError:
genotypes = []
return genotypes
# - Phenotype
[docs] def phenotypes(self, generation_range=None, only_main=False):
"""Load unique phenotypes."""
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = (
"SELECT DISTINCT phenotype FROM full_search " "WHERE label={};"
).format(label)
result = self._dbms.execute_query(query)
else:
query = (
"SELECT DISTINCT phenotype FROM full_search "
"WHERE label={} AND generation BETWEEN ? AND ?;"
).format(label)
result = self._dbms.execute_query(query, generation_range)
return self._deserializer.phenotypes(result)
[docs] def phenotypes_with_given_fitness(
self, fitness, generation_range=None, only_main=False
):
"""Load unique phenotypes that have the same user-provided fitness value."""
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = (
"SELECT DISTINCT phenotype FROM full_search "
"WHERE label={} AND fitness=?;"
).format(label)
result = self._dbms.execute_query(query, (fitness,))
else:
query = (
"SELECT DISTINCT phenotype FROM full_search "
"WHERE label={} AND generation BETWEEN ? AND ? AND fitness=?;"
).format(label)
result = self._dbms.execute_query(query, (*generation_range, fitness))
return self._deserializer.phenotypes(result)
[docs] def phenotypes_with_min_fitness(self, generation_range=None, only_main=False):
"""Load unique phenotypes that have the same minimum fitness value."""
try:
value = self.fitness_min(generation_range, only_main)
phenotypes = self.phenotypes_with_given_fitness(
value, generation_range, only_main
)
except _exceptions.DatabaseError:
phenotypes = []
return phenotypes
[docs] def phenotypes_with_max_fitness(self, generation_range=None, only_main=False):
"""Load unique phenotypes that have the same minimum fitness value."""
try:
value = self.fitness_max(generation_range, only_main)
phenotypes = self.phenotypes_with_given_fitness(
value, generation_range, only_main
)
except _exceptions.DatabaseError:
phenotypes = []
return phenotypes
# - Details (optionally returned by objective function during phenotype-fitness evaluation)
[docs] def details(self, generation_range=None, only_main=False):
"""Load unique details (returned by the objective function)."""
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = (
"SELECT DISTINCT details FROM full_search " "WHERE label={};"
).format(label)
result = self._dbms.execute_query(query)
else:
query = (
"SELECT DISTINCT details FROM full_search "
"WHERE label={} AND generation BETWEEN ? AND ?;"
).format(label)
result = self._dbms.execute_query(query, generation_range)
return self._deserializer.multiple_details(result)
[docs] def details_with_given_fitness(
self, fitness, generation_range=None, only_main=False
):
"""Load unique details that have the same user-provided fitness value."""
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = (
"SELECT DISTINCT details FROM full_search "
"WHERE label={} AND fitness=?;"
).format(label)
result = self._dbms.execute_query(query, (fitness,))
else:
query = (
"SELECT DISTINCT details FROM full_search "
"WHERE label={} AND generation BETWEEN ? AND ? AND fitness=?;"
).format(label)
result = self._dbms.execute_query(query, (*generation_range, fitness))
return self._deserializer.multiple_details(result)
[docs] def details_with_min_fitness(self, generation_range=None, only_main=False):
"""Load unique details that have the same minimum fitness value."""
try:
value = self.fitness_min(generation_range, only_main)
details = self.details_with_given_fitness(
value, generation_range, only_main
)
except _exceptions.DatabaseError:
details = []
return details
[docs] def details_with_max_fitness(self, generation_range=None, only_main=False):
"""Load unique details that have the same maximum fitness value."""
try:
value = self.fitness_max(generation_range, only_main)
details = self.details_with_given_fitness(
value, generation_range, only_main
)
except _exceptions.DatabaseError:
details = []
return details
# - Fitness
[docs] def fitnesses(self, generation_range=None, only_main=False):
"""Load unique fitness values."""
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = (
"SELECT DISTINCT fitness FROM full_search " "WHERE label={};"
).format(label)
result = self._dbms.execute_query(query)
else:
query = (
"SELECT DISTINCT fitness FROM full_search "
"WHERE label={} AND generation BETWEEN ? AND ?;"
).format(label)
result = self._dbms.execute_query(query, generation_range)
return self._deserializer.fitnesses(result)
[docs] def fitness_min(self, generation_range=None, only_main=False):
"""Load the minimum fitness value that was found."""
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = ("SELECT MIN(fitness) FROM full_search " "WHERE label={};").format(
label
)
result = self._dbms.execute_query(query)
else:
query = (
"SELECT MIN(fitness) FROM full_search "
"WHERE label={} AND generation BETWEEN ? AND ?;"
).format(label)
result = self._dbms.execute_query(query, generation_range)
# Check
value = self._deserializer.fitness(result[0][0])
if value != value: # if NaN
_exceptions.raise_fitness_min_error()
return value
[docs] def fitness_max(self, generation_range=None, only_main=False):
"""Load the maximum fitness value that was found."""
# Argument processing
generation_range = self._process_generation_range(generation_range)
label = '"main"' if only_main else "label"
# Query
if generation_range is None:
query = ("SELECT MAX(fitness) FROM full_search " "WHERE label={};").format(
label
)
result = self._dbms.execute_query(query)
else:
query = (
"SELECT MAX(fitness) FROM full_search "
"WHERE label={} AND generation BETWEEN ? AND ?;"
).format(label)
result = self._dbms.execute_query(query, generation_range)
# Check
value = self._deserializer.fitness(result[0][0])
if value != value: # if NaN
_exceptions.raise_fitness_max_error()
return value
[docs] def fitness_min_after_num_evals(self, num_evaluations):
"""Load the minimum fitness value that was found after a number of fitness evaluations."""
# Argument processing
_ap.int_arg("num_evaluations", num_evaluations, min_incl=1)
# Query
query = (
"SELECT MIN(fitness) from ("
" SELECT fitness FROM genotype_phenotype_mapping AS t1 "
" LEFT JOIN phenotype_fitness_mapping AS t2 "
" ON t2.phenotype=t1.phenotype "
" LIMIT ?"
");"
)
result = self._dbms.execute_query(query, (num_evaluations,))
# Check
value = self._deserializer.fitness(result[0][0])
if value != value: # if NaN
_exceptions.raise_fitness_min_n_error()
return value
[docs] def fitness_max_after_num_evals(self, num_evaluations):
"""Load the maximum fitness value that was found after a number of fitness evaluations."""
# Argument processing
_ap.int_arg("num_evaluations", num_evaluations, min_incl=1)
# Query
query = (
"SELECT MAX(fitness) from ("
" SELECT fitness FROM genotype_phenotype_mapping AS t1 "
" LEFT JOIN phenotype_fitness_mapping AS t2 "
" ON t2.phenotype=t1.phenotype "
" LIMIT ?"
");"
)
result = self._dbms.execute_query(query, (num_evaluations,))
# Check
value = self._deserializer.fitness(result[0][0])
if value != value: # if NaN
_exceptions.raise_fitness_max_n_error()
return value
# - Genotype-phenotype evaluations
[docs] def gen_to_phe_evaluations(self, num_evaluations=None):
"""Get genotype-to-phenotype evaluations that were performed during the search.
Guaranteed:
- The order of the list is the order of performed evaluations.
Not guaranteed:
- The same evaluations may have been performed multiple times during the run,
depending on cache settings, which is not available as information in the
database.
"""
if num_evaluations is None:
query = "SELECT * FROM genotype_phenotype_mapping;"
result = self._dbms.execute_query(query)
else:
query = "SELECT * FROM genotype_phenotype_mapping LIMIT ?;"
result = self._dbms.execute_query(query, (num_evaluations,))
return self._deserializer.gt_phe_map(result)
# - Phenotype-fitness evaluations
[docs] def phe_to_fit_evaluations(self, num_evaluations=None, with_details=False):
"""Get phenotype-to-fitness evaluations that were performed during the search.
Guaranteed:
- The order of the list is the order of performed evaluations.
- Genotype-phenotype pairs that were loaded from previous runs are not considered.
Not guaranteed:
- The same evaluations may have been performed multiple times during the run,
depending on cache and database lookup settings, which is not available
as information in the database.
"""
# Argument processing
if with_details:
tables = "DISTINCT(t1.phenotype), fitness, details"
else:
tables = "DISTINCT(t1.phenotype), fitness"
# Query
# Note: Uses genotype_phenotype_mapping to get the right order of phenotypes and
# not be influenced by potentially external data present in phenotype_fitness_mapping.
if num_evaluations is None:
query = (
"SELECT {} FROM genotype_phenotype_mapping AS t1 "
"LEFT JOIN phenotype_fitness_mapping AS t2 "
"ON t2.phenotype=t1.phenotype;"
).format(tables)
result = self._dbms.execute_query(query)
else:
query = (
"SELECT {} FROM genotype_phenotype_mapping AS t1 "
"LEFT JOIN phenotype_fitness_mapping AS t2 "
"ON t2.phenotype=t1.phenotype "
"LIMIT ?;"
).format(tables)
result = self._dbms.execute_query(query, (num_evaluations,))
# Conditional return
if with_details:
return self._deserializer.phe_fit_det_map(result)
else:
return self._deserializer.phe_fit_map(result)
# Support for memoization of phenotype-fitness mappings
def _lookup_phenotype_evaluations(self, phenotypes):
"""Get phenotype-to-fitness evaluations for all known phenotypes in a given list.
References
----------
- https://www.sqlite.org/limits.html
- https://stackoverflow.com/questions/44012117/what-is-the-most-efficient-way-to-query-multiple-values-from-a-single-column-in
"""
# Query
n_max = 999 # SQLITE_MAX_VARIABLE_NUMBER for SQLite versions prior to 3.32.0
n = len(phenotypes)
if n > n_max:
# Split it into multiple queries if the list contains too many phenotypes
result = []
for i in range(0, n, n_max):
partial = self._phenotype_evaluations(phenotypes[i : i + n_max])
result.extend(partial)
else:
# Single query
values = [str(phe) for phe in phenotypes]
query = "SELECT * FROM phenotype_fitness_mapping WHERE phenotype IN ({})".format(
",".join(["?"] * len(values))
)
result = self._dbms.execute_query(query, values)
ph = self._deserializer.phenotype
fi = self._deserializer.fitness
de = self._deserializer.details
result = [(ph(row[0]), (fi(row[1]), de(row[2]))) for row in result]
return result
# Data representations
[docs] def to_list(self, generation_range=None, only_main=False):
"""Convert the database entries to a list of rows and add some derived information.
It uses lazy loading, i.e. it is only constructed again if the database
has changed since the last call.
The first list entry contains the column names.
"""
def create_list():
# Load entries
query = "SELECT * FROM full_search;"
data = self._dbms.execute_query(query)
# Deserialize entries and derive new information
def safe_len(obj):
try:
return len(obj)
except TypeError:
return 0
for i in range(len(data)):
row = data[i]
data[i] = [
# contained
row[0], # individual_id [int]
self._deserializer.parent_ids(row[1]), # parent_ids [list, None]
row[2], # generation [int]
row[3], # label [str]
self._deserializer.genotype(row[4]), # genotype [Genotype]
self._deserializer.phenotype(row[5]), # phenotype [str, None]
self._deserializer.fitness(row[6]), # fitness [float, NaN]
self._deserializer.details(
row[7]
), # details [None, JSON object, str]
# derived
safe_len(row[4]), # genotype_length [int]
safe_len(row[5]), # phenotype_length [int]
-1, # rank [int] (calculated later)
]
# Rank calculation for each population
def assign_ranks(row_idx, fitnesses):
# Sort a completed generation by fitness, derive ranks and assign them
if fitnesses:
rank_idx = list(
range(len(fitnesses), 0, -1)
) # consider entries -n to -1
rank_idx_fit = list(zip(rank_idx, fitnesses))
rank_idx_fit.sort(
key=lambda x: x[1]
) # sort by fitness to distribute ranks
for rank, (rank_idx, _) in enumerate(rank_idx_fit):
data[row_idx - rank_idx][
10
] = rank # assign ranks to entries -n to -1
last_label = "invalid label"
last_generation = "invalid generation"
fitnesses = []
for i in range(len(data)):
row = data[i]
generation = row[2]
label = row[3]
fitness = row[6]
if generation != last_generation or label != last_label:
assign_ranks(i, fitnesses)
fitnesses = []
last_generation = generation
last_label = label
fitnesses.append(fitness)
if data:
assign_ranks(i + 1, fitnesses)
return [tuple(row) for row in data]
# Calculation or lookup in cache
data = self._lookup_or_calc("to_list", create_list)
# Optional filtering
data = self._filter_list(data, generation_range, only_main)
return data
[docs] def to_columns(self):
"""Get the columns available in all data."""
# primary source of truth about columns in data
columns = (
"individual_id",
"parent_ids",
"generation",
"label",
"genotype",
"phenotype",
"fitness",
"details",
"genotype_length",
"phenotype_length",
"rank",
)
return columns
[docs] def to_dataframe(self, generation_range=None, only_main=False):
"""Convert the database entries to a Pandas DataFrame.
Some derived information is added during the conversion.
Parameters
----------
only_main_populations : bool
From the complete dataframe, only main populations are kept
and intermediate ones (selected parents, crossed-over or
mutated populations) are filtered out. Note that the
crossed-over population is not evaluated if a mutation
operator is provided, so that they have no associated
phenotype and fitness values.
References
----------
- https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.total_changes
"""
import pandas
# Input data
data = self.to_list(generation_range, only_main)
columns = self.to_columns()
# DataFrame
df = pandas.DataFrame(data, columns=columns)
return df
[docs] def to_network(self, generation_range=None, only_main=False):
"""Convert the database entries to NetworkX graph and add some derived information."""
import networkx as nx
# Input data
nodes, edges = self._to_graph(generation_range, only_main)
# Graph
graph = nx.Graph()
# Nodes
columns = self.to_columns() + ("x", "y", "z", "form", "color", "size")
for row in nodes:
attributes = {key: val for key, val in zip(columns, row)}
attributes["hover"] = "\n".join(
"{}: {}".format(key, val) for key, val in zip(columns, row)
)
graph.add_node(row[0], **attributes)
# Edges
columns = (
"parent_id",
"individual_id",
"individual_x",
"parent_x",
"individual_y",
"parent_y",
"individual_z",
"parent_z",
"individual_label",
"style",
"color",
"width",
"individual_generation",
)
for row in edges:
graph.add_edge(
row[0], row[1], **{key: val for key, val in zip(columns[1:], row[1:])}
)
return graph
[docs] def to_jgf(self, generation_range=None, only_main=False):
"""Convert the data into JSON graph format for visualization."""
def create_jgf(nodes, edges):
# Graph as dictionary in JSON graph format
graph = {
"graph": {
"directed": False,
"nodes": [],
"edges": [],
}
}
gn = graph["graph"]["nodes"]
ge = graph["graph"]["edges"]
# Nodes
columns = self.to_columns() + ("x", "y", "z", "form", "color", "size")
for row in nodes:
# Adapt coordinats (which fit only for Matplotlib and Plotly)
x = row[11] * 100
y = row[12] * -25
z = row[13] * 5
hover = "\n".join(
"{}: {}".format(key.title(), val)
for key, val in list(zip(columns, row))
if key in ["phenotype", "fitness"]
)
node = {
"id": row[0],
"metadata": {
"x": x,
"y": y,
"z": z,
"color": row[15],
"size": row[16],
"hover": hover,
},
}
gn.append(node)
# Edges
columns = (
"parent_id",
"individual_id",
"individual_x",
"parent_x",
"individual_y",
"parent_y",
"individual_z",
"parent_z",
"individual_label",
"style",
"color",
"width",
"individual_generation",
)
for row in edges:
edge = {
"source": row[0],
"target": row[1],
"metadata": {
"size": row[11],
"color": row[10],
},
}
ge.append(edge)
return graph
nodes, edges = self._to_graph(generation_range, only_main)
return create_jgf(nodes, edges)
def _to_graph(self, generation_range, only_main):
"""Convert the data to a graph object."""
# Given data
data = self.to_list()
# Calculation or lookup in cache: only new information, not contained in given data
def create_network(data):
population_size_max = self.population_size_max()
nodes = self._to_nodes(data, population_size_max)
edges = self._to_edges(data, nodes)
return nodes, edges
nodes, edges = self._lookup_or_calc("_to_graph", create_network, data)
# Combination: merge given and calculated/cached data
nodes = [data[i] + nodes[i] for i in range(len(data))]
# Optional filtering
nodes, edges = self._filter_graph(
data, nodes, edges, generation_range, only_main
)
return nodes, edges
def _to_nodes(self, data, population_size_max):
"""Convert the data to nodes for a graph."""
# Default values
default_size = 3
default_form = "o"
default_color = "black"
color_for_new_phenotypes = "#00CC00"
x_offset = 0.0
y_offset = population_size_max
z_offset = 0.0
x_stretch = 0.85
# Node data generation
nodes = []
seen_phenotypes = set()
for row in data:
generation = row[2]
label = row[3]
phenotype = row[5]
fitness = row[6]
rank = row[10]
# Coordinates
x = generation
y = rank
z = fitness
# Use default values
size = default_size
form = default_form
color = default_color
# Adapt them with specific information
# - Color nodes which represent newly created phenotypes that undergo fitness evaluation
if label == "main" or label == "mutation":
if str(phenotype) not in seen_phenotypes:
color = color_for_new_phenotypes
# - Node positions and sizes
if label == "main":
size = size * 2.0
seen_phenotypes.add(str(phenotype))
else:
if label == "parent_selection":
x -= 2.0 / 4.0 + 1.0 / 4.0 * x_stretch - x_offset
y *= 0.5
y += y_offset
z += z_offset
elif label == "crossover":
x -= 2.0 / 4.0 - x_offset
y *= 0.5
y += y_offset
z += z_offset
elif label == "mutation":
x -= 2.0 / 4.0 - 1.0 / 4.0 * x_stretch - x_offset
y *= 0.5
y += y_offset
z += z_offset
nodes.append((x, y, z, form, color, size))
return nodes
def _to_edges(self, data, nodes):
"""Convert the data to edges for a graph."""
# Default values
default_color = "black"
default_width = 0.2
default_style = "solid"
# Edge data generation
edges = []
lookback_index = dict()
for i in range(len(data)):
# individual_id, parent_ids, generation, label, genotype, phenotype, fitness, details,
# genotype_length, phenotype_length, rank
row = data[i] + nodes[i]
individual_id = row[0]
individual_parent_ids = row[1]
individual_generation = row[2]
individual_label = row[3]
individual_genotype = row[4]
# x, y, z, form, color, size
individual_x = row[11]
individual_y = row[12]
individual_z = row[13]
individual_color = row[15]
lookback_index[individual_id] = i
for parent_id in individual_parent_ids:
try:
parent_idx = lookback_index[parent_id]
except KeyError:
continue
parent_row = data[parent_idx] + nodes[parent_idx]
parent_label = parent_row[3]
parent_genotype = parent_row[4]
parent_x = parent_row[11]
parent_y = parent_row[12]
parent_z = parent_row[13]
# Use default values
color = default_color
width = default_width
style = default_style
# Adapt them with specific information
# - Thicken identity relations from main population to main population
if parent_label == "main" and individual_label == "main":
width *= 4
# - Color inheritance relations that lead to offspring with other gt than parents
if individual_label == "crossover" or individual_label == "mutation":
if individual_genotype != parent_genotype:
color = "#EE0000"
width *= 2
# - Color identity relations for nodes with newly discovered phenotype
if parent_label != "main" and individual_label == "main":
color = individual_color
edge = (
parent_id, # 0
individual_id, # 1
individual_x, # 2
parent_x, # 3
individual_y, # 4
parent_y, # 5
individual_z, # 6
parent_z, # 7
individual_label, # 8
style, # 9
color, # 10
width, # 11
individual_generation, # 12
)
edges.append(edge)
return edges
def _filter_list(self, data, generation_range, only_main):
"""Filter the data in list form."""
# Argument processing
filter_gen = generation_range not in (None, (None, None), [None, None])
if filter_gen:
first_gen, last_gen = self._process_generation_range(generation_range)
# Quick check if any filter has to be applied
if not data or (not only_main and not filter_gen):
return data
# Define the required filter
if filter_gen and only_main:
def keep(row):
return row[3] == "main" and row[2] >= first_gen and row[2] <= last_gen
elif filter_gen:
def keep(row):
return row[2] >= first_gen and row[2] <= last_gen
else:
def keep(row):
return row[3] == "main"
# Apply filter
data = [row for row in data if keep(row)]
return data
def _filter_graph(self, data, nodes, edges, generation_range=None, only_main=False):
"""Filter the data in graph form."""
# Argument processing
filter_gen = generation_range not in (None, (None, None), [None, None])
if filter_gen:
first_gen, last_gen = self._process_generation_range(generation_range)
# Quick check if any filter has to be applied
if not data or (not only_main and not filter_gen):
return nodes, edges
# Define the required filters
# - node filter
if filter_gen and only_main:
def keep_node(row, used_ind_ids):
gn = row[2]
if row[3] == "main" and gn >= first_gen and gn <= last_gen:
used_ind_ids.add(row[0])
return True
return False
elif filter_gen:
def keep_node(row, used_ind_ids):
gn = row[2]
if (gn == first_gen and row[3] == "main") or (
gn > first_gen and gn <= last_gen
):
used_ind_ids.add(row[0])
return True
return False
else:
def keep_node(row, used_ind_ids):
if row[3] == "main":
used_ind_ids.add(row[0])
return True
return False
# - edge filter
def keep_edge(row, used_ind_ids):
return row[0] in used_ind_ids and row[1] in used_ind_ids
# Apply filters
used_ind_ids = set()
nodes = [n for n in nodes if keep_node(n, used_ind_ids)]
edges = [e for e in edges if keep_edge(e, used_ind_ids)]
return nodes, edges
[docs] def plot_genealogy(
self, backend="vis", generation_range=None, only_main=False, **kwargs
):
"""Create a genealogy plot.
It shows the relationships between all individuals created
throughout a run.
"""
# Argument processing
backend = _ap.str_arg("backend", backend, vals=("d3", "vis", "three"))
# Data preparation
if "edge_curvature" not in kwargs:
kwargs["edge_curvature"] = 0.0
if "show_node_label" not in kwargs:
kwargs["show_node_label"] = False
graph = self.to_jgf(generation_range, only_main)
# Plot
fig = _plots.genealogy(graph, backend, **kwargs)
return fig
# Caching
def _lookup_or_calc(self, key, calc_func, *calc_args):
"""Look up a result in the cache or calculate it.
If it is not availabe yet in the cache, calculate it once and
store it for later reuse.
"""
num_changes = self._dbms.get_num_changes()
try:
# Cache lookup
assert self._cache[key]["num_changes"] == num_changes
result = self._cache[key]["data"]
result = _copy.copy(
result
) # prevent shallow modification of original, deep too slow
except (AssertionError, KeyError):
# Calculation
result = calc_func(*calc_args)
self._cache[key] = {"num_changes": num_changes, "data": result}
return result
[docs]class Deserializer:
"""Convert database entries back to Python objects."""
__slots__ = ("_system",)
[docs] def __init__(self, system):
"""Create a deserializer that knows the chosen G3P system."""
self._system = system
[docs] def individual_id(self, data):
"""Get the id of an individual."""
return int(data)
[docs] def parent_ids(self, data):
"""Get the ids of parent individuals."""
if data is None:
return []
return _json.loads(data)
[docs] def genotype(self, data):
"""Reconstruct a genotype.
This generates a Genotype object. Its exact type depends on the
grammar-based genetic programming system being used.
"""
return self._system.representation.Genotype(data)
[docs] def genotypes(self, data):
"""Reconstruct a list of genotypes."""
return [self.genotype(row[0]) for row in data]
[docs] def phenotype(self, data):
"""Reconstruct a phenotypes."""
if data is None:
return ""
return data
[docs] def phenotypes(self, data):
"""Reconstruct a list of phenotypes."""
return [self.phenotype(row[0]) for row in data]
[docs] def fitness(self, data):
"""Reconstruct a fitness value."""
if data is None:
return float("nan")
return data
[docs] def fitnesses(self, data):
"""Reconstruct a list of fitness values."""
return [self.fitness(row[0]) for row in data]
[docs] def details(self, data):
"""Reconstruct a details object."""
# None
if data is None:
return None
# JSON
try:
return _json.loads(data)
except Exception:
pass
# str
return data
[docs] def multiple_details(self, data):
"""Reconstruct a list of details objects."""
return [self.details(row[0]) for row in data]
[docs] def gt_phe_map(self, data):
"""Reconstruct genotype-to-phenotype mappings."""
return [(self.genotype(row[0]), self.phenotype(row[1])) for row in data]
[docs] def phe_fit_map(self, data):
"""Reconstruct phenotype-to-fitness mappings."""
return [(self.phenotype(row[0]), self.fitness(row[1])) for row in data]
[docs] def phe_fit_det_map(self, data):
"""Reconstruct phenotype-to-fitness-and-details mappings."""
return [
(self.phenotype(row[0]), self.fitness(row[1]), self.details(row[2]))
for row in data
]
[docs] def individual(self, data, without_parent_ids=False):
"""Reconstruct an individual.
This generates an Individual object. Its exact type depends on
the grammar-based genetic programming system being used.
"""
# Split row into values
ind_id, par_ids, gnr, lab, gt, phe, fit, det = data
# Special case: do not load parent ids
if without_parent_ids:
par_ids = None
# Deserialization: convert each string from the database to a suitable Python object
ind_id = self.individual_id(ind_id)
par_ids = self.parent_ids(par_ids)
gt = self.genotype(gt)
fit = self.fitness(fit)
det = self.details(det)
# Object creation
ind = self._system.representation.Individual(
genotype=gt,
phenotype=phe,
fitness=fit,
details=dict(
id=ind_id,
parent_ids=par_ids,
evaluation=det,
),
)
return ind
[docs] def individuals(self, data):
"""Reconstruct a list of individuals."""
return [self.individual(row, self._system) for row in data]
[docs] def population(self, data):
"""Reconstruct a population.
This generates a Population object. Its exact type depends on
the grammar-based genetic programming system being used.
"""
individuals = self.individuals(data)
return self._system.representation.Population(individuals)