"""Forward mapping function for WHGE.
Note that a reverse function for WHGE is supposedly
not possible in full generality.
"""
from ... import _grammar
from ... import exceptions as _exceptions
from ..._utilities.parametrization import get_given_or_default as _get_given_or_default
from . import _cached_calculations
from . import default_parameters as _dp
from . import representation as _representation
# Shortcuts for brevity and minor speedup
_GT = _representation.Genotype
_DT = _grammar.data_structures.DerivationTree
_NT = _grammar.data_structures.NonterminalSymbol
def _fe(me, re):
if re:
_exceptions.raise_max_expansion_error(me)
# Forward mapping
[docs]def forward(
grammar,
genotype,
parameters=None,
raise_errors=True,
return_derivation_tree=False,
verbose=False,
):
"""Map a WHGE genotype to a string phenotype.
Parameters
----------
grammar : `~alogos.Grammar`
genotype : `~.representation.Genotype` or data that can be converted to it
parameters : `dict` or `~alogos._utilities.parametrization.ParameterCollection`, optional
Following keyword-value pairs are considered by this function:
- ``max_expansions`` (`int`): Maximum number of nonterminal
expansions allowed in the derivation created by the mapping
process.
- ``max_depth`` (`int`): Maximum tree depth. This is considered
only in calculating the expressive power of different rules.
raise_errors : `bool`, optional
Possible values:
- `True`: A mapping error will be raised if a derivation is
not finished within a limit provided in the parameters.
- `False`: A partial derivation is allowed. In this case, the
returned string will contain unexpanded nonterminal symbols.
Therefore it is not a valid phenotype, i.e. not a string of
the grammar's language but a so-called sentential form.
return_derivation_tree : `bool`, optional
If `True`, not only the phenotype is returned but additionally
also the derivation tree.
verbose : `bool`, optional
If `True`, output about steps of the mapping process is printed.
Returns
-------
phenotype : `str`
If ``return_derivation_tree`` is `False`, which is the default.
(phenotype, derivation_tree) : `tuple` with two elements of type `str` and `~alogos._grammar.data_structures.DerivationTree`
If ``return_derivation_tree`` is `True`.
Raises
------
MappingError
If ``raise_errors`` is `True` and the mapping process can not
generate a full derivation before reaching a limit provided in
the parameters.
"""
# Parameter extraction
me = _get_given_or_default("max_expansions", parameters, _dp)
md = _get_given_or_default("max_depth", parameters, _dp)
# Argument processing
if not isinstance(genotype, _GT):
genotype = _GT(genotype)
# Cache look-up
sd = grammar._lookup_or_calc(
"whge", "sd", _cached_calculations.shortest_distances, grammar
)
ep = grammar._lookup_or_calc(
"whge", "ep_d{}".format(md), _cached_calculations.expressive_powers, grammar, md
)
# Mapping
if verbose:
dt = _forward_slow(grammar, genotype.data, me, sd, ep, raise_errors, verbose)
else:
dt = _forward_fast(grammar, genotype.data, me, sd, ep, raise_errors)
# Conditional return
phe = dt.string()
if return_derivation_tree:
return phe, dt
return phe
def _forward_fast(gr, gt, me, sd, ep, re):
"""Calculate the genotype-to-phenotype map of WHGE in a fast way."""
dt = _DT(gr)
x = dt._expand
a1 = gt.count()
st = [(dt.root_node, gt)]
p = st.pop
a = st.append
w = gr.production_rules
ec = 0
while st:
if me is not None and ec >= me:
_fe(me, re)
break
nd, gt = p()
sy = nd.symbol
n1 = gt.count()
b = len(gt)
rs = w[sy]
nr = len(rs)
if nr == 1:
r = 0
elif b >= nr:
lb = b // nr
la = lb + 1
m = b % nr
gs = []
s = 0
for i in range(nr):
e = s + (la if i < m else lb)
gs.append(gt[s:e])
s = e
v = []
d = -1.0
for j, g in enumerate(gs):
c = g.count() / len(g)
if c == d:
v.append(j)
elif c > d:
d = c
v = [j]
r = v[n1 % len(v)]
else:
c = n1 if b > 0 else a1
v = sd[sy]
r = v[c % len(v)]
z = x(nd, rs[r])
u = len(z)
ec += 1
if u > b:
f = [gt[0:0] for _ in z]
else:
e = [ep[n.symbol] if isinstance(n.symbol, _NT) else 0 for n in z]
t = sum(e)
if t > 0.0:
y = [int(h / t * b) for h in e]
else:
y = [0 for _ in e]
q = sum(y)
if q > 0:
ms = int(b / q)
nr = b - ms * q
y = [x * ms for x in y]
else:
nr = b - q
for c in range(nr):
y[c % u] += 1
f = []
s = 0
for n in y:
e = s + n
f.append(gt[s:e])
s = e
for k in range(u):
if len(f[k]) == b:
f[k] = f[k][:-1]
break
for i in range(u):
if isinstance(z[i].symbol, _NT):
a((z[i], f[i]))
return dt
def _forward_slow(
grammar,
genotype,
max_expansions,
shortest_distances,
expressive_powers,
raise_errors,
verbose,
):
"""Calculate the genotype-to-phenotype map of WHGE in a slow way.
This is a readable implementation of the mapping process, which
also allows to print output about the steps it involves.
It served as basis for the faster, minified implementation in this
module and may be helpful in understanding, replicating or modifying
the algorithm.
"""
derivation_tree = _grammar.data_structures.DerivationTree(grammar)
stack = [(derivation_tree.root_node, genotype)]
ones_in_full_genotype = genotype.count()
expansion_counter = 0
if verbose:
header = "Start of the derivation"
print(header)
print("=" * len(header))
print(
"- Get start symbol of the grammar: <{}>".format(grammar.start_symbol.text)
)
print()
print("Sentential form: {}".format(derivation_tree.string()))
print()
while stack:
# Check stop conditions (not part of paper, but of all other mappings)
if max_expansions is not None and expansion_counter >= max_expansions:
if verbose:
header = "Stop condition fulfilled"
print(header)
print("=" * len(header))
print(
"- The maximum number of expansions is reached: {}".format(
max_expansions
)
)
print()
if raise_errors:
_exceptions.raise_max_expansion_error(max_expansions)
break
# 1) Choose nonterminal: WHGE is order-independent, simply use leftmost
chosen_nt_idx = 0
chosen_nt_node, current_genotype = stack.pop(chosen_nt_idx)
current_symbol = chosen_nt_node.symbol
ones_in_current_genotype = current_genotype.count()
if verbose:
header = "Expansion {}".format(expansion_counter + 1)
print(header)
print("=" * len(header))
print("- Choice of nonterminal to expand")
print(" Index: {} by always using leftmost".format(chosen_nt_idx))
print(" Symbol s: <{}>".format(chosen_nt_node.symbol.text))
print(
' Genotype substring g\': "{}"'.format(
_bitarray_to_str(current_genotype)
)
)
# 2) Choose rule: WHGE decides with full information of a sub-genotype, not a single codon
rules = _rules_for(grammar, current_symbol)
num_bits = len(current_genotype)
num_rules = len(rules)
if verbose:
print("- Choice of rule to apply")
print(" Calculation steps")
print(" R_s = RulesFor(s)")
operator = ">=" if num_bits >= num_rules else "<"
print(
" Number of bits |g'|: {} {} Number of rules |R_s|: {}".format(
num_bits, operator, num_rules
)
)
if num_rules == 1:
chosen_rule_idx = 0
if verbose:
print(
" Index: {} by using the only available rule without "
"considering the genotype substring".format(chosen_rule_idx)
)
elif num_bits >= num_rules:
new_genotypes_for_rule_choice = _split_for_rule(current_genotype, rules)
if verbose:
print(" g\"_i = SplitForRule(g', |R_s|)")
for i, gen in enumerate(new_genotypes_for_rule_choice):
print(
' g"_{}: "{}" with {} bits'.format(
i, _bitarray_to_str(gen), len(gen)
)
)
print(' LargestCardIndex(g"_i)')
chosen_rule_idx = _largest_card_index(
new_genotypes_for_rule_choice, ones_in_current_genotype
)
if verbose:
print(
" Index: {} by calculation with SplitForRule and "
"LargestCardIndex".format(chosen_rule_idx)
)
else:
if verbose:
print(" ShortestRuleIndex(R_s)")
chosen_rule_idx = _shortest_rule_index(
current_genotype,
current_symbol,
shortest_distances,
ones_in_current_genotype,
ones_in_full_genotype,
)
if verbose:
print(
" Index: {} by calculation with ShortestRuleIndex".format(
chosen_rule_idx
)
)
# 3) Expand the chosen nonterminal (1) with the rhs of the chosen rule (2)
if verbose:
print("- Application of rule to nonterminal")
print(" ApplyRule(R_s, i={})".format(chosen_rule_idx))
new_nodes = _apply_rule(rules, chosen_rule_idx, derivation_tree, chosen_nt_node)
expansion_counter += 1
# 4) Calculate sub-genotypes belonging to each new nonterminal and terminal symbol
if verbose:
print("- Distribution of bits to new child nodes")
print(" g'_i = SplitForChildren()")
new_genotypes_for_children = _split_for_children(
current_genotype, new_nodes, expressive_powers
)
for i, child_genotype in enumerate(new_genotypes_for_children):
if child_genotype == current_genotype:
new_genotypes_for_children[i] = _drop_trailing_bit(child_genotype)
if verbose:
print(
' g\'_{}: "{}" by DropTrailingBit("{}")'.format(
i,
_bitarray_to_str(new_genotypes_for_children[i]),
_bitarray_to_str(child_genotype),
)
)
else:
if verbose:
print(
' g\'_{}: "{}"'.format(i, _bitarray_to_str(child_genotype))
)
# 5) Add new nodes that contain a nonterminal and genotypes to the stack
stack = _append_children(new_nodes, new_genotypes_for_children, stack)
if verbose:
print()
print("Sentential form: {}".format(derivation_tree.string()))
print()
header = "End of the derivation"
if verbose:
print(header)
print("=" * len(header))
print()
name = (
"String" if derivation_tree.is_completely_expanded() else "Sentential form"
)
print("{}: {}".format(name, derivation_tree.string()))
return derivation_tree
# Functions adhering closely to pseudocode in 2018 paper
def _rules_for(grammar, symbol):
return grammar.production_rules[symbol]
def _split_for_rule(genotype, rules):
num_rules = len(rules)
num_bits = len(genotype)
l_g = num_bits // num_rules # equivalent to float division followed by floor
first_n = num_bits % num_rules
length1 = l_g + 1
length2 = l_g
new_genotypes = []
start = 0
for i in range(num_rules):
length = length1 if i < first_n else length2
end = start + length
sub_genotype = genotype[start:end]
new_genotypes.append(sub_genotype)
start = end
return new_genotypes
def _largest_card_index(genotypes, ones_in_current_genotype):
# Caution: card == largest_card should not be used to compare float
# values but is used in the Java reference implementation and
# therefore duplicated here to deliver identical behavior (perhaps
# only on identical machines!)
# Cleaner (and a bit slower) would be: abs(card - largest_card) < EPS
indices = []
largest_card = -1.0
for i, genotype in enumerate(genotypes):
num_bits = len(genotype)
num_1 = genotype.count()
card = num_1 / num_bits
if card == largest_card:
indices.append(i)
elif card > largest_card:
largest_card = card
indices = [i]
num_options = len(indices)
if num_options == 1:
chosen_rule_index = indices[0]
print(
' g"_{} has highest relative cardinality {}'.format(
chosen_rule_index, largest_card
)
)
else:
# Comment in original Java code: for avoiding choosing always
# the 1st option in case of tie, choose depending on count of
# 1s in genotype
chosen_option = ones_in_current_genotype % num_options
chosen_rule_index = indices[chosen_option]
genotypes_str = ", ".join('g"_{}'.format(i) for i in indices)
print(
" {} share the same highest relative cardinality {}".format(
genotypes_str, largest_card
)
)
print(
' Selected g"_{} by {} % {} = {}'.format(
chosen_rule_index,
ones_in_current_genotype,
num_options,
chosen_option,
)
)
return chosen_rule_index
def _shortest_rule_index(
genotype,
symbol,
shortest_distances,
ones_in_genotype,
ones_in_full_genotype,
):
count = ones_in_genotype if len(genotype) > 0 else ones_in_full_genotype
indices = shortest_distances[symbol]
num_options = len(indices)
index = indices[count % num_options]
if num_options == 1:
print(
" Rule {} has the lowest number of steps to reach a "
"terminal sequence.".format(index)
)
else:
print(
" Rule {} share the same lowest number of steps to reach a "
"terminal sequence.".format(", ".join(str(ind) for ind in indices))
)
print(
" Selected rule {} by "
"{} % {} = {}".format(index, count, num_options, index)
)
return index
def _apply_rule(rules, chosen_rule_idx, derivation_tree, chosen_nt_node):
chosen_rule = rules[chosen_rule_idx]
new_nodes = derivation_tree._expand(chosen_nt_node, chosen_rule)
rhs = "".join(
sym.text
if isinstance(sym, _grammar.data_structures.TerminalSymbol)
else "<{}>".format(sym.text)
for sym in chosen_rule
)
print(" Substitution: <{}> -> {}".format(chosen_nt_node.symbol.text, rhs))
return new_nodes
def _drop_trailing_bit(genotype):
return genotype[:-1]
def _split_for_children(genotype, child_nodes, expressive_powers):
if len(child_nodes) > len(genotype):
genotypes = [genotype[0:0] for _ in child_nodes]
else:
lengths = _weighted_partitioning(len(genotype), child_nodes, expressive_powers)
genotypes = []
last_length = 0
for length in lengths:
new_length = last_length + length
genotype_part = genotype[last_length:new_length]
genotypes.append(genotype_part)
last_length = new_length
return genotypes
def _weighted_partitioning(length, child_nodes, expressive_powers):
"""Calculate the length of each sub-genotype (=non-overlapping parts of genotype)."""
# Design choice: Don't store 0 values for terminals, because it is
# irrelevant information and omitting it may give a minor speedup
# (and prevents NT and T name clashes if using str)
log_ep_values = [
expressive_powers[node.symbol] if node.contains_nonterminal() else 0
for node in child_nodes
]
log_ep_sum = sum(log_ep_values)
if log_ep_sum > 0:
lengths = [
int(log_ep_val / log_ep_sum * length) for log_ep_val in log_ep_values
]
else:
lengths = [0 for _ in log_ep_values]
# Distribute remaining bits
sum_lengths = sum(lengths)
if sum_lengths > 0:
min_size = int(length / sum_lengths)
num_remaining_bits = length - min_size * sum_lengths
lengths = [n * min_size for n in lengths]
else:
num_remaining_bits = length - sum(lengths)
num_substrings = len(child_nodes)
for cnt in range(num_remaining_bits):
position_j = (
cnt % num_substrings
) # Paper has +1 here because indexing starts with 1
lengths[position_j] += 1
return lengths
def _append_children(new_nodes, new_genotypes, stack):
new_stack_entries = [
(node, genotype)
for node, genotype in zip(new_nodes, new_genotypes)
if node.contains_nonterminal()
]
stack = new_stack_entries + stack
return stack
# Helper functions for printing
def _bitarray_to_str(arr):
return "".join("1" if bit else "0" for bit in arr.tolist())