Source code for grammaropt.grammar

"""
This is the base module where the grammar can be built
and the base walkers are defined.
See `Walker` to know more about walkers.
"""
from types import MethodType
from collections import deque
from collections import namedtuple
from functools import partial

from parsimonious.grammar import Grammar
from parsimonious.expressions import Expression
from parsimonious.expressions import Sequence
from parsimonious.expressions import OneOf
from parsimonious.expressions import Compound
from parsimonious.expressions import Literal
from parsimonious.nodes import Node
from parsimonious.nodes import RegexNode

from .terminal_types import Type


def build_grammar(rules, types={}):
    """
    Build a Grammar object based on a set of rules and a set of types.
    The syntax of rules doc can be checked in `parsimoninous` documentation.
    Types are special kinds terminals that describe a type (for instance, Int or Float).
    Types are used so that walkers are aware of them and thus can do a special
    treatement to them (consider them as values instead of strings).

    Parameters
    ----------
    
    rules : str
        string following an EBNF-like syntax used by `parsimonious` to represent grammar rules.
    types : dict of str->Type
        maps a type name to a Type object.
        type names can be used inside the `rules` as terminals.

    Returns
    -------
    `parsimonious` Grammar object

    """
    more_rules = _build_type_rules(types)
    return Grammar(rules, **more_rules)


def extract_rules_from_grammar(grammar):
    """
    extract all the rules (including non-named ones) from a grammar

    Parameters
    ---------

    grammar : Grammar
        grammar from which to extract the rules

    Returns
    -------

    set of `parsimonious` `Expression` objects
    """
    full_rules = set()
    _extract_rules(grammar.values(), out=full_rules)
    return full_rules


def _extract_rules(rules, out=set()):
    """
    put recursively all rules from `rules` into `out`
    """
    for rule in rules:
        if rule not in out:
            out.add(rule)
            if isinstance(rule, Compound):
                _extract_rules(rule.members, out=out)

#TODO :@lru_cache(max_size=None) was removed because it was causing
# recursion error with parsimonious 0.8
def rule_depth(rule):
    return _rule_depth(rule, depths=None)


def _rule_depth(rule, depths=None):
    """
    get the minimal number of production rules to reach a terminal, starting
    from a rule `rule`.
    TODO : should be replaced by dijkstra algo for more efficiency
    """
    if depths is None:
        depths = {}
    # detecting and deal with cycles
    if rule in depths:
        return depths[rule]
    depths[rule] = float('inf')
    if isinstance(rule, OneOf):
        depth = 1 + min(map(partial(_rule_depth, depths=depths), rule.members))
    elif isinstance(rule, Sequence):
        depth = 1 + max(map(partial(_rule_depth, depths=depths), rule.members))
    else:
        depth = 0
    depths[rule] = depth
    return depth


def _build_type_rules(types):
    rules = {}
    for name, type in types.items():
        rules[name] = type
        rules[name].name = name #TODO is it necessary?
    return rules


[docs]class Walker: """ Walkers are objects that do a random walk (deterministic walks are a special case of random walks) on the production rules space of a given grammar : the responsability of a Walker is to choose a production rule whenever it has to, and to choose `Type` values whenever it has to. After the end of the walk, a trace is left describing the traversal of the space. The trace differs depending on the kind of Walker. Parameters ---------- grammar : Grammar grammar where to walk min_depth : int minimum depth of the parse tree. max_depth : int maximum depth of the parse tree. Note that it could exceed `max_depth` because when it reaches `max_depth` there is no garanthee that there would always be a terminal production rule to choose. The solution to this problem is that when `max_depth` is reached, non-terminal production rules stop from being candidates to be chosen, but when only what we can choose are non-terminal production rules, we just choose one of them, even if `max_depth` is exceeded, otherwise the obtained string will not be a valid one according to the grammar. strict_depth_limit : bool if True, when `max_depth` is reached, forbid any further production rules when a choice should be made. If False, even when `max_depth` is reached, choose terminals when terminals are available, otherwise keep applying production rules. """ def __init__(self, grammar, min_depth=None, max_depth=None, strict_depth_limit=False): if min_depth and max_depth: assert min_depth <= max_depth self.grammar = grammar self.min_depth = min_depth self.max_depth = max_depth self.strict_depth_limit = strict_depth_limit
[docs] def next_rule(self, rules): """ Given a set of production `rules`, choose the next one. Implemented by specific Walkers. `depth` is an `int` provides information about the current depth of the parsetree. """ raise NotImplementedError()
[docs] def next_value(self, rule): """ Given a `Type` `rule`, choose its value. Implemented by specific Walkers. """ raise NotImplementedError()
def _init_walk(self): """ Should be overloaded by specific Walkers. Called each time to initialize the walk, it can be used to empty the trace. """ self.terminals = []
[docs] def walk(self, start=None): """ Do a random walk on the grammar. Parameters ---------- start : str str that provides the starting rule name if `start` is not provides it uses the default rule of the grammar (which is the first rule). """ self._init_walk() grammar = self.grammar if start is None: rule = grammar.default_rule else: rule = grammar[start] # each element of the stack is (rule, depth) # top elements of the stack are at the right (first is older one, last newest one) stack = deque([(rule, 0)]) while len(stack): rule, depth = stack.pop() if isinstance(rule, OneOf): members = self._filter_by_depth(rule.members, depth) if len(members): chosen_rule = self.next_rule(members) stack.append((chosen_rule, depth + 1)) elif isinstance(rule, Sequence): members = rule.members members = [(m, depth + 1) for m in members] # put them in reverse order so that when popped the order # is the same than the order of `members` stack.extend(members[::-1]) elif isinstance(rule, Literal): val = rule.literal self.terminals.append(val) elif isinstance(rule, Type): val = self.next_value(rule) self.terminals.append(val)
def _filter_by_depth(self, rules, depth): if self.min_depth is not None and depth <= self.min_depth: if self.strict_depth_limit: return [] depths = list(map(rule_depth, rules)) max_depth = max(depths) rules = [r for r, d in zip(rules, depths) if d == max_depth] return rules elif self.max_depth is not None and depth >= self.max_depth: if self.strict_depth_limit: return [] depths = list(map(rule_depth, rules)) min_depth = min(depths) rules = [r for r, d in zip(rules, depths) if d == min_depth] return rules else: return rules
_Decision = namedtuple('Decision', ['rule', 'choice'])
[docs]class DeterministicWalker(Walker): """ a very specific Walker that uses a given str expression `expr`, parse it usign the grammar `grammar`, then use the parse tree to force the next rule to choose and the next value to choose to correspond exactly to the expression `expr`. This very specific walker trace is used by the RNN walker to compute the loss for a given groundtruth expressions. Unfortunately I had to patch the uncached_match method of `parsimonious` `OneOf` and `Type` rules of the grammar to get some missing information required in the trace, the missing information needed by `DeterministicWalker` was the parent rule that is used to create a `Node`. """ def __init__(self, grammar, expr): super().__init__(grammar) self.expr = expr self._init_walk() def _init_walk(self): super()._init_walk() self.decisions = [] def walk(self): #WARNING : this function have a side effect on the grammar object `self.grammar` # but it cleans things up at the end of the call. I dont't find yet a better # way of doing that (see the description of `DeterministicWalker` to see why). grammar = self.grammar self._init_walk() rules = extract_rules_from_grammar(grammar) # patch OneOf and Type _uncached_match method for rule in rules: if isinstance(rule, OneOf): rule._uncached_match_orig = rule._uncached_match rule._uncached_match = MethodType(_patched_oneof_match, rule) elif isinstance(rule, Type): rule._uncached_match_orig = rule._uncached_match rule._uncached_match = MethodType(_patched_type_match, rule) node = grammar.parse(self.expr) stack = deque([node]) while len(stack): node = stack.pop() # these are all nodes where choices have been made (that is, either `OneOf` or `Type`) if hasattr(node, 'parent_rule'): # OneOf nodes if isinstance(node.parent_rule, OneOf): self.decisions.append(_Decision(rule=node.parent_rule, choice=node.rule)) # Type nodes elif isinstance(node.parent_rule, Type): val = node.parent_rule.from_str(node.text) self.decisions.append(_Decision(rule=node.parent_rule, choice=val)) else: raise TypeError('Unrecognize parent rule : {}'.format(node.parent_rule)) # terminals if len(node.children) == 0: val = node.text if hasattr(node, 'parent_rule') and isinstance(node.parent_rule, Type): val = node.parent_rule.from_str(val) self.terminals.append(val) stack.extend(node.children[::-1]) # unpatch OneOf and Type _uncached_match method for rule in rules: if hasattr(rule, '_uncached_match_orig'): rule._uncached_match = rule._uncached_match_orig
def _patched_oneof_match(self, text, pos, cache, error): # pasted from pasimonious source code (parsimonious.expressions.OneOf) # only difference is two lines for m in self.members: node = m.match_core(text, pos, cache, error) if node is not None: node = Node(self.name, text, pos, node.end, children=[node]) node = _Wrapper(node) # proxy class to bypass __slots__ of Node node.parent_rule = self # newly addde line node.rule = m # newly added line return node def _patched_type_match(self, text, pos , cache, error): # pasted from `parsimonious` source code (parsimonious.expressions.Regex) # only difference is one line m = self.re.match(text, pos) if m is not None: span = m.span() node = RegexNode(self.name, text, pos, pos + span[1] - span[0]) node = _Wrapper(node) # proxy class to bypass __slots__ of RegexNode node.match = m # TODO: A terrible idea for cache size? node.parent_rule = self # newly added line return node def as_str(terminals): """ converts a set of literals (either values or str) to a single str (concatenation of all the literals) it is used to convert the trace obtained by a Walker (in `walker.terminals`) into a single str """ return ''.join(map(str, terminals)) class DecisionsWalker(Walker): """ simple Walker which is used by `Vectorizer` to recover the terminals from a sequence of `decisions` obtained from `DeterministicWalker`. From the terminals, it is possible to obtain the string expression by concatenating them (the terminals). """ def __init__(self, grammar, decisions): super().__init__(grammar, min_depth=None, max_depth=None, strict_depth_limit=False) self.decisions = decisions def _init_walk(self): super()._init_walk() self._external_decisions = self.decisions[::-1] def next_rule(self, rules): parent_rule, rule = self._external_decisions.pop() return rule def next_value(self, stats, rule): rule_, val = self._external_decisions.pop() return val NULL_SYMBOL = None class Vectorizer: """ a class that is used to vectorize a list of string expressions to a list of lists, where each element represent a string expression using rules IDs (integers), where each rule ID correspond to a decision of a rule according to the grammar. `Vectorizer` can be used to train sequence models (e.g., RNNs) without using the ones given in the framework (that is, in the module `grammaropt.rnn`), if this is desired. Parameters ---------- pad : bool if True, add the NULL padding character to the right up to `max_length`, that is, each transformed element will have exactly the length `max_length` if pad is used. max_length : int or None used if `pad` is True, it specifies the length that a all transformed elements will have. """ def __init__(self, grammar, pad=True, max_length=None): self.grammar = grammar self.pad = pad self.max_length = max_length self._rules = None self.tok_to_id = None def _init(self): if not self._rules: self._rules = extract_rules_from_grammar(self.grammar) if not self.tok_to_id: self.tok_to_id = {} self.tok_to_id[NULL_SYMBOL] = 0 self.tok_to_id.update({r: i + 1 for i, r in enumerate(self._rules)}) self._id_to_tok = {i: r for r, i in self.tok_to_id.items()} def transform(self, doc): """ transforms takes as input a list of strings, and returns a list of list of integers. if `max_length` is not given (that is, it is None) and pad is True, then `max_length` will be the maximum length of the strings in `doc`. Parameters ---------- doc : list of str """ self._init() doc = [self._transform_single(expr) for expr in doc] if self.pad: if self.max_length is None: max_length = max(map(len, doc)) else: max_length = self.max_length for expr in doc: assert len(expr) <= max_length, "All expressions of `doc` must be smaller than max_length={}, but {} is bigger".format(max_length, expr) doc = [expr + [0] * (max_length - len(expr)) for expr in doc] return doc def _transform_single(self, expr): wl = DeterministicWalker(self.grammar, expr) wl.walk() for decision in wl.decisions: assert isinstance(decision.choice, Expression), "Value-kind decisions are not supported" return [self.tok_to_id[decision.choice] for decision in wl.decisions] def inverse_transform(self, doc): """ inverse_transform is the inverse operation of transform. It takes as input a list of list of ints, and returns a list of strings. Parameters ---------- doc : list of list of int """ self._init() return [self._inverse_transform_single(expr) for expr in doc] def _inverse_transform_single(self, expr): expr = [symb for symb in expr if symb is not NULL_SYMBOL] decisions = [_Decision(rule=None, choice=self._id_to_tok[symb]) for symb in expr] wl = DecisionsWalker(self.grammar, decisions) wl.walk() return as_str(wl.terminals) class _Wrapper(object): ''' Source : http://code.activestate.com/recipes/577555-object-wrapper-class/ Object wrapper class. This a wrapper for objects. It is initialiesed with the object to wrap and then proxies the unhandled getattribute methods to it. Other classes are to inherit from it. ''' def __init__(self, obj): ''' Wrapper constructor. @param obj: object to wrap ''' # wrap the object self._wrapped_obj = obj def __getattr__(self, attr): # see if this object has attr # NOTE do not use hasattr, it goes into # infinite recurrsion if attr in self.__dict__: # this object has it return getattr(self, attr) # proxy to the wrapped object return getattr(self._wrapped_obj, attr)