Source code for delphin.mrs.simplemrs


"""
Serialization functions for the SimpleMRS format.
"""


# Author: Michael Wayne Goodman <goodmami@uw.edu>

from __future__ import print_function

from collections import deque, defaultdict
import re
from warnings import warn

from delphin.util import stringtypes
from delphin.mrs import Mrs
from delphin.mrs.components import (
    ElementaryPredication, Pred, Lnk, HandleConstraint, IndividualConstraint,
    sort_vid_split, var_sort, var_re, hcons, icons
)
from delphin.mrs.config import (HANDLESORT, CONSTARG_ROLE)
from delphin.mrs.util import rargname_sortkey
from delphin.exceptions import (
    XmrsDeserializationError as XDE,
    XmrsError,
    XmrsWarning
)

try:
    from pygments import highlight as highlight_
    from pygments.formatters import TerminalFormatter
    from delphin.extra.highlight import SimpleMrsLexer, mrs_colorscheme
    lexer = SimpleMrsLexer()
    formatter = TerminalFormatter(bg='dark', colorscheme=mrs_colorscheme)
    def highlight(text):
        return highlight_(text, lexer, formatter)
except ImportError:
    # warnings.warn
    def highlight(text):
        return text

# versions are:
#  * 1.0 long running standard
#  * 1.1 added support for MRS-level lnk, surface and EP-level surface
_default_version = 1.1
_latest_version = 1.1

_valid_hcons = ['qeq', 'lheq', 'outscopes']

# pretty-print options
_default_mrs_delim = '\n'

##############################################################################
##############################################################################
# Pickle-API methods


[docs]def load(fh, single=False, version=_default_version, strict=False, errors='warn'): """ Deserialize SimpleMRSs from a file (handle or filename) Args: fh (str, file): input filename or file object single: if `True`, only return the first read Xmrs object strict: deprecated; a `True` value is the same as `errors='strict'`, and a `False` value is the same as `errors='warn'` errors: if `'strict'`, ill-formed MRSs raise an error; if `'warn'`, raise a warning instead; if `'ignore'`, do not warn or raise errors for ill-formed MRSs Returns: a generator of Xmrs objects (unless the *single* option is `True`) """ if isinstance(fh, stringtypes): s = open(fh, 'r').read() else: s = fh.read() return loads(s, single=single, version=version, strict=strict, errors=errors)
[docs]def loads(s, single=False, version=_default_version, strict=False, errors='warn'): """ Deserialize SimpleMRS string representations Args: s (str): a SimpleMRS string single (bool): if `True`, only return the first Xmrs object Returns: a generator of Xmrs objects (unless *single* is `True`) """ ms = deserialize(s, version=version, strict=strict, errors=errors) if single: return next(ms) else: return ms
[docs]def dump(destination, ms, single=False, version=_default_version, properties=True, pretty_print=False, color=False, **kwargs): """ Serialize Xmrs objects to SimpleMRS and write to a file Args: destination: filename or file object where data will be written ms: an iterator of Xmrs objects to serialize (unless the *single* option is `True`) single: if `True`, treat *ms* as a single Xmrs object instead of as an iterator properties: if `False`, suppress variable properties pretty_print: if `True`, add newlines and indentation color: if `True`, colorize the output with ANSI color codes """ text = dumps(ms, single=single, version=version, properties=properties, pretty_print=pretty_print, color=color, **kwargs) if hasattr(destination, 'write'): print(text, file=destination) else: with open(destination, 'w') as fh: print(text, file=fh)
[docs]def dumps(ms, single=False, version=_default_version, properties=True, pretty_print=False, color=False, **kwargs): """ Serialize an Xmrs object to a SimpleMRS representation Args: ms: an iterator of Xmrs objects to serialize (unless the *single* option is `True`) single: if `True`, treat *ms* as a single Xmrs object instead of as an iterator properties: if `False`, suppress variable properties pretty_print: if `True`, add newlines and indentation color: if `True`, colorize the output with ANSI color codes Returns: a SimpleMrs string representation of a corpus of Xmrs """ if not pretty_print and kwargs.get('indent'): pretty_print = True if single: ms = [ms] return serialize(ms, version=version, properties=properties, pretty_print=pretty_print, color=color)
# for convenience load_one = lambda fh, **kwargs: load(fh, single=True, **kwargs) loads_one = lambda s, **kwargs: loads(s, single=True, **kwargs) dump_one = lambda fh, m, **kwargs: dump(fh, m, single=True, **kwargs) dumps_one = lambda m, **kwargs: dumps(m, single=True, **kwargs) ############################################################################## ############################################################################## # Deserialization # The _tokenizer has 3 sub-regexen: # the first is for strings (e.g. "_dog_n_rel", "\"quoted string\"") # the second looks for unquoted type preds (lookahead for space or lnk) # the second is for args, variables, preds, etc (e.g. ARG1, _dog_n_rel, x4) # the last is for contentful punctuation (e.g. [ ] < > : # @) _tokenizer = re.compile(r'("[^"\\]*(?:\\.[^"\\]*)*"' r'|_(?:[^ \n<]|<(?![-0-9:#@ ]*>))*' r'|[^ \n:#@\[\]"<>]+' r'|[:#@\[\]<>])')
[docs]def tokenize(string): """Split the SimpleMrs string into tokens.""" return deque(_tokenizer.findall(string))
def _invalid_token_error(token, expected): raise XDE('Invalid token: "{}"\tExpected: "{}"'.format(token, expected)) def deserialize(string, version=_default_version, strict=True, errors='warn'): if strict: warnings.warn( 'strict=True parameter is deprecated; use errors=\'strict\'', DeprecationWarning ) errors = 'strict' # FIXME: consider buffering this so we don't read the whole string at once tokens = tokenize(string) while tokens: yield _read_mrs(tokens, version, errors) def _read_literals(tokens, *toks): for tok in toks: token = tokens.popleft() if token != tok: raise XDE( 'Expected \'{}\': {}'.format(tok, ' '.join(list(tokens))) ) def _read_mrs(tokens, version, errors): #return read_mrs(tokens) try: _read_literals(tokens, '[') top = idx = surface = lnk = None vars_ = {} if version >= 1.1: if tokens[0] == '<': lnk = _read_lnk(tokens) if tokens[0].startswith('"'): # and tokens[0].endswith('"'): surface = tokens.popleft()[1:-1] # get rid of first quotes if tokens[0].upper() in ('LTOP', 'TOP'): tokens.popleft() # LTOP / TOP _read_literals(tokens, ':') top = tokens.popleft() vars_[top] = [] if tokens[0].upper() == 'INDEX': tokens.popleft() # INDEX _read_literals(tokens, ':') idx = tokens.popleft() vars_[idx] = _read_props(tokens) rels = _read_rels(tokens, vars_) hcons = _read_cons(tokens, 'HCONS', vars_) icons = _read_cons(tokens, 'ICONS', vars_) _read_literals(tokens, ']') # at this point, we could uniquify proplists in vars_, but most # likely it isn't necessary, and might harm things if we # leave potential dupes in there. let's see how it plays out. m = Mrs(top=top, index=idx, rels=rels, hcons=hcons, icons=icons, lnk=lnk, surface=surface, vars=vars_) except IndexError: _unexpected_termination_error() if errors != 'ignore': try: m.validate() except XmrsError as ex: if errors == 'warn': warn(str(ex), XmrsWarning) elif errors == 'strict': raise return m def _read_props(tokens): props = [] if tokens[0] == '[': tokens.popleft() # [ vartype = tokens.popleft() # this gets discarded though while tokens[0] != ']': key = tokens.popleft() _read_literals(tokens, ':') val = tokens.popleft() props.append((key, val)) tokens.popleft() # ] return props def _read_rels(tokens, vars_): rels = None nid = 10000 if tokens[0].upper() == 'RELS': rels = [] tokens.popleft() # RELS _read_literals(tokens, ':', '<') while tokens[0] != '>': rels.append(_read_ep(tokens, nid, vars_)) nid += 1 tokens.popleft() # > return rels def _read_ep(tokens, nid, vars_): # reassign these locally to avoid global lookup CARG = CONSTARG_ROLE _var_re = var_re # begin parsing _read_literals(tokens, '[') pred = Pred.surface_or_abstract(tokens.popleft()) lnk = _read_lnk(tokens) surface = label = None if tokens[0].startswith('"'): surface = tokens.popleft()[1:-1] # get rid of first quotes if tokens[0].upper() == 'LBL': tokens.popleft() # LBL _read_literals(tokens, ':') label = tokens.popleft() vars_[label] = [] args = {} while tokens[0] != ']': role = tokens.popleft().upper() _read_literals(tokens, ':') val = tokens.popleft() if role.upper() == CARG: if val and (val[0], val[-1]) == ('"', '"'): val = val[1:-1] elif _var_re.match(val) is not None: props = _read_props(tokens) if val not in vars_: vars_[val] = [] vars_[val].extend(props) args[role] = val tokens.popleft() # ] return ElementaryPredication(nid, pred, label, args, lnk, surface) def _read_cons(tokens, constype, vars_): cons = None if tokens[0].upper() == constype: cons = [] tokens.popleft() # (H|I)CONS _read_literals(tokens, ':', '<') while tokens[0] != '>': left = tokens.popleft() lprops = _read_props(tokens) reln = tokens.popleft().lower() rght = tokens.popleft() rprops = _read_props(tokens) cons.append((left, reln, rght)) # update properties if left not in vars_: vars_[left] = [] vars_[left].extend(lprops) if rght not in vars_: vars_[rght] = [] vars_[rght].extend(lprops) tokens.popleft() # > return cons def _read_lnk(tokens): """Read and return a tuple of the pred's lnk type and lnk value, if a pred lnk is specified.""" # < FROM : TO > or < FROM # TO > or < TOK... > or < @ EDGE > lnk = None if tokens[0] == '<': tokens.popleft() # we just checked this is a left angle if tokens[0] == '>': pass # empty <> brackets the same as no lnk specified # edge lnk: ['@', EDGE, ...] elif tokens[0] == '@': tokens.popleft() # remove the @ lnk = Lnk.edge(tokens.popleft()) # edge lnks only have one number # character span lnk: [FROM, ':', TO, ...] elif tokens[1] == ':': lnk = Lnk.charspan(tokens.popleft(), tokens[1]) tokens.popleft() # this should be the colon tokens.popleft() # and this is the cto # chart vertex range lnk: [FROM, '#', TO, ...] elif tokens[1] == '#': lnk = Lnk.chartspan(tokens.popleft(), tokens[1]) tokens.popleft() # this should be the hash tokens.popleft() # and this is the to vertex # tokens lnk: [(TOK,)+ ...] else: lnkdata = [] while tokens[0] != '>': lnkdata.append(int(tokens.popleft())) lnk = Lnk.tokens(lnkdata) _read_literals(tokens, '>') return lnk def _unexpected_termination_error(): raise XDE('Invalid MRS: Unexpected termination.') ############################################################################## ############################################################################## # Encoding
[docs]def serialize(ms, version=_default_version, properties=True, pretty_print=False, color=False): """Serialize an MRS structure into a SimpleMRS string.""" delim = '\n' if pretty_print else _default_mrs_delim output = delim.join( _serialize_mrs(m, properties=properties, version=version, pretty_print=pretty_print) for m in ms ) if color: output = highlight(output) return output
def _serialize_mrs(m, properties, version=_default_version, pretty_print=False): # note that varprops is modified as a side-effect of the lower # functions if properties: varprops = {v: d['props'] for v, d in m._vars.items() if d['props']} else: varprops = {} toks = [] if version >= 1.1: header_toks = [] if m.lnk is not None and m.lnk.data != (-1, -1): # don't do <-1:-1> header_toks.append(_serialize_lnk(m.lnk)) if m.surface is not None: header_toks.append('"{}"'.format(m.surface)) if header_toks: toks.append(' '.join(header_toks)) if m.top is not None: toks.append(_serialize_argument( 'TOP' if version >= 1.1 else 'LTOP', m.top, varprops )) if m.index is not None: toks.append(_serialize_argument( 'INDEX', m.index, varprops )) delim = ' ' if not pretty_print else '\n ' toks.append('RELS: < {eps} >'.format( eps=delim.join(_serialize_ep(ep, varprops, version=version) for ep in m.eps()) )) toks += [_serialize_hcons(hcons(m))] icons_ = icons(m) if icons_: # make unconditional for "ICONS: < >" toks += [_serialize_icons(icons_)] delim = ' ' if not pretty_print else '\n ' return '{} {} {}'.format('[', delim.join(toks), ']') def _serialize_argument(rargname, value, varprops): """Serialize an MRS argument into the SimpleMRS format.""" _argument = '{rargname}: {value}{props}' if rargname == CONSTARG_ROLE: value = '"{}"'.format(value) props = '' if value in varprops: props = ' [ {} ]'.format( ' '.join( [var_sort(value)] + list(map('{0[0]}: {0[1]}'.format, [(k.upper(), v) for k, v in varprops[value]])) ) ) del varprops[value] # only print props once return _argument.format( rargname=rargname, value=str(value), props=props ) def _serialize_ep(ep, varprops, version=_default_version): """Serialize an Elementary Predication into the SimpleMRS encoding.""" # ('nodeid', 'pred', 'label', 'args', 'lnk', 'surface', 'base') args = ep[3] arglist = ' '.join([_serialize_argument(rarg, args[rarg], varprops) for rarg in sorted(args, key=rargname_sortkey)]) if version < 1.1 or len(ep) < 6 or ep[5] is None: surface = '' else: surface = ' "%s"' % ep[5] lnk = None if len(ep) < 5 else ep[4] pred = ep[1] predstr = pred.string return '[ {pred}{lnk}{surface} LBL: {label}{s}{args} ]'.format( pred=predstr, lnk=_serialize_lnk(lnk), surface=surface, label=str(ep[2]), s=' ' if arglist else '', args=arglist ) def _serialize_lnk(lnk): """Serialize a predication lnk to surface form into the SimpleMRS encoding.""" s = "" if lnk is not None: s = '<' if lnk.type == Lnk.CHARSPAN: cfrom, cto = lnk.data s += ''.join([str(cfrom), ':', str(cto)]) elif lnk.type == Lnk.CHARTSPAN: cfrom, cto = lnk.data s += ''.join([str(cfrom), '#', str(cto)]) elif lnk.type == Lnk.TOKENS: s += ' '.join([str(t) for t in lnk.data]) elif lnk.type == Lnk.EDGE: s += ''.join(['@', str(lnk.data)]) s += '>' return s def _serialize_hcons(hcons): """Serialize [HandleConstraints] into the SimpleMRS encoding.""" toks = ['HCONS:', '<'] for hc in hcons: toks.extend(hc) # reln = hcon[1] # toks += [hcon[0], rel, str(hcon.lo)] toks += ['>'] return ' '.join(toks) def _serialize_icons(icons): """Serialize [IndividualConstraints] into the SimpleMRS encoding.""" toks = ['ICONS:', '<'] for ic in icons: toks.extend(ic) # toks += [str(icon.left), # icon.relation, # str(icon.right)] toks += ['>'] return ' '.join(toks)