Source code for delphin.mrs.eds


"""
Elementary Dependency Structures (EDS).
"""

from __future__ import print_function

from itertools import count

from delphin.util import stringtypes, _bfs, _connected_components
from delphin.mrs.xmrs import Xmrs
from delphin.mrs.components import (
    var_sort,
    Lnk,
    Pred,
    Node,
    nodes as make_nodes
)
from delphin.mrs.query import find_argument_target
from delphin.mrs.util import rargname_sortkey
from delphin.mrs.config import CVARSORT
from delphin.lib.pegre import (
    literal as lit,
    regex,
    nonterminal as nt,
    sequence as seq,
    zero_or_more,
    optional as opt,
    delimited,
    bounded,
    Ignore,
    Peg,
    Spacing,
    Integer,
    DQString
)


[docs]class Eds(object): """ An Elementary Dependency Structure (EDS) instance. EDS are semantic structures deriving from MRS, but they are not interconvertible with MRS and therefore do not share a common superclass (viz, :class:`~delphin.mrs.xmrs.Xmrs`) in PyDelphin, although this may change in a future version. EDS shares some common features with DMRS, so this class borrows some DMRS elements, such as :class:`Nodes <delphin.mrs.components.Node>`. Args: top: nodeid of the top node in the graph nodes: iterable of :class:`Nodes <delphin.mrs.components.Node>` edges: iterable of (start, role, end) triples """ def __init__(self, top=None, nodes=None, edges=None): if nodes is None: nodes = [] if edges is None: edges = [] self.top = top self._nodeids = [node.nodeid for node in nodes] self._nodes = {node.nodeid: node for node in nodes} self._edges = {node.nodeid: {} for node in nodes} for start, rargname, end in edges: self._edges[start][rargname] = end
[docs] @classmethod def from_xmrs(cls, xmrs, predicate_modifiers=False, **kwargs): """ Instantiate an Eds from an Xmrs (lossy conversion). Args: xmrs (:class:`~delphin.mrs.xmrs.Xmrs`): Xmrs instance to convert from predicate_modifiers (function, bool): function that is called as `func(xmrs, deps)` after finding the basic dependencies (`deps`), returning a mapping of predicate-modifier dependencies; the form of `deps` and the returned mapping are `{head: [(role, dependent)]}`; if *predicate_modifiers* is `True`, the function is created using :func:`non_argument_modifiers` as: `non_argument_modifiers(role="ARG1", connecting=True); if *predicate_modifiers* is `False`, only the basic dependencies are returned """ eps = xmrs.eps() deps = _find_basic_dependencies(xmrs, eps) # if requested, find additional dependencies not captured already if predicate_modifiers is True: func = non_argument_modifiers(role='ARG1', only_connecting=True) addl_deps = func(xmrs, deps) elif predicate_modifiers is False or predicate_modifiers is None: addl_deps = {} elif hasattr(predicate_modifiers, '__call__'): addl_deps = predicate_modifiers(xmrs, deps) else: raise TypeError('a boolean or callable is required') for nid, deplist in addl_deps.items(): deps.setdefault(nid, []).extend(deplist) ids = _unique_ids(eps, deps) root = _find_root(xmrs) if root is not None: root = ids[root] nodes = [Node(ids[n.nodeid], *n[1:]) for n in make_nodes(xmrs)] edges = [(ids[a], rarg, ids[b]) for a, deplist in deps.items() for rarg, b in deplist] return cls(top=root, nodes=nodes, edges=edges)
def __eq__(self, other): if not isinstance(other, Eds): return False return ( self.top == other.top and all(a == b for a, b in zip(self.nodes(), other.nodes())) and self._edges == other._edges )
[docs] def nodeids(self): """Return the list of nodeids.""" return list(self._nodeids)
[docs] def node(self, nodeid): """Return the node whose `nodeid` is *nodeid*.""" return self._nodes[nodeid]
[docs] def nodes(self): """Return the list of nodes.""" getnode = self._nodes.__getitem__ return [getnode(nid) for nid in self._nodeids]
[docs] def edges(self, nodeid): """Return the edges starting at *nodeid*.""" return dict(self._edges.get(nodeid, []))
[docs] def to_dict(self, properties=True): """ Encode the Eds as a dictionary suitable for JSON serialization. """ nodes = {} for node in self.nodes(): nd = { 'label': node.pred.short_form(), 'edges': self.edges(node.nodeid) } if node.lnk is not None: nd['lnk'] = {'from': node.cfrom, 'to': node.cto} if properties: if node.cvarsort is not None: nd['type'] = node.cvarsort props = node.properties if props: nd['properties'] = props if node.carg is not None: nd['carg'] = node.carg nodes[node.nodeid] = nd return {'top': self.top, 'nodes': nodes}
[docs] @classmethod def from_dict(cls, d): """ Decode a dictionary, as from :meth:`to_dict`, into an Eds object. """ makepred, charspan = Pred.surface_or_abstract, Lnk.charspan top = d.get('top') nodes, edges = [], [] for nid, node in d.get('nodes', {}).items(): props = node.get('properties', {}) if 'type' in node: props[CVARSORT] = node['type'] if not props: props = None lnk = None if 'lnk' in node: lnk = charspan(node['lnk']['from'], node['lnk']['to']) nodes.append( Node( nodeid=nid, pred=makepred(node['label']), sortinfo=props, lnk=lnk, carg=node.get('carg') ) ) edges.extend( (nid, rargname, tgtnid) for rargname, tgtnid in node.get('edges', {}).items() ) nodes.sort(key=lambda n: (n.cfrom, -n.cto)) return cls(top, nodes=nodes, edges=edges)
[docs] def to_triples(self, short_pred=True, properties=True): """ Encode the Eds as triples suitable for PENMAN serialization. """ node_triples, edge_triples = [], [] # sort nodeids just so top var is first nodes = sorted(self.nodes(), key=lambda n: n.nodeid != self.top) for node in nodes: nid = node.nodeid pred = node.pred.short_form() if short_pred else node.pred.string node_triples.append((nid, 'predicate', pred)) if node.lnk: node_triples.append((nid, 'lnk', '"{}"'.format(str(node.lnk)))) if node.carg: node_triples.append((nid, 'carg', '"{}"'.format(node.carg))) if properties: if node.cvarsort is not None: node_triples.append((nid, 'type', node.cvarsort)) props = node.properties node_triples.extend((nid, p, v) for p, v in props.items()) edge_triples.extend( (nid, rargname, tgt) for rargname, tgt in sorted( self.edges(nid).items(), key=lambda x: rargname_sortkey(x[0]) ) ) return node_triples + edge_triples
[docs] @classmethod def from_triples(cls, triples): """ Decode triples, as from :meth:`to_triples`, into an Eds object. """ nids, nd, edges = [], {}, [] for src, rel, tgt in triples: if src not in nd: nids.append(src) nd[src] = {'pred': None, 'lnk': None, 'carg': None, 'si': []} if rel == 'predicate': nd[src]['pred'] = Pred.surface_or_abstract(tgt) elif rel == 'lnk': cfrom, cto = tgt.strip('"<>').split(':') nd[src]['lnk'] = Lnk.charspan(int(cfrom), int(cto)) elif rel == 'carg': if (tgt[0], tgt[-1]) == ('"', '"'): tgt = tgt[1:-1] nd[src]['carg'] = tgt elif rel == 'type': nd[src]['si'].append((CVARSORT, tgt)) elif rel.islower(): nd[src]['si'].append((rel, tgt)) else: edges.append((src, rel, tgt)) nodes = [ Node( nodeid=nid, pred=nd[nid]['pred'], sortinfo=nd[nid]['si'], lnk=nd[nid]['lnk'], carg=nd[nid]['carg'] ) for nid in nids ] top = nids[0] if nids else None return cls(top=top, nodes=nodes, edges=edges)
def _find_basic_dependencies(m, eps): deps = {} for ep in eps: nid = ep.nodeid if ep.is_quantifier(): deps[nid] = [('BV', m.nodeid(ep.intrinsic_variable))] else: epdeps = [] args = m.outgoing_args(nid) for rargname in args: tgtnid = find_argument_target(m, nid, rargname) epdeps.append((rargname, tgtnid)) deps[nid] = epdeps return deps
[docs]def non_argument_modifiers(role='ARG1', only_connecting=True): """ Return a function that finds non-argument modifier dependencies. Args: role (str): the role that is assigned to the dependency only_connecting (bool): if `True`, only return dependencies that connect separate components in the basic dependencies; if `False`, all non-argument modifier dependencies are included Returns: a function with signature `func(xmrs, deps)` that returns a mapping of non-argument modifier dependencies Examples: The default function behaves like the LKB: >>> func = non_argument_modifiers() A variation is similar to DMRS's MOD/EQ links: >>> func = non_argument_modifiers(role="MOD", only_connecting=False) """ def func(xmrs, deps): edges = [] for src in deps: for _, tgt in deps[src]: edges.append((src, tgt)) components = _connected_components(xmrs.nodeids(), edges) ccmap = {} for i, component in enumerate(components): for n in component: ccmap[n] = i addl = {} if not only_connecting or len(components) > 1: lsh = xmrs.labelset_heads lblheads = {v: lsh(v) for v, vd in xmrs._vars.items() if 'LBL' in vd['refs']} for heads in lblheads.values(): if len(heads) > 1: first = heads[0] joined = set([ccmap[first]]) for other in heads[1:]: occ = ccmap[other] srt = var_sort(xmrs.args(other).get(role, 'u0')) needs_edge = not only_connecting or occ not in joined edge_available = srt == 'u' if needs_edge and edge_available: addl.setdefault(other, []).append((role, first)) joined.add(occ) return addl return func
def _unique_ids(eps, deps): # deps can be used to single out ep from set sharing ARG0s new_ids = ('_{}'.format(i) for i in count(start=1)) ids = {} used = {} for ep in eps: nid = ep.nodeid iv = ep.intrinsic_variable if iv is None or ep.is_quantifier(): iv = next(new_ids) ids[nid] = iv used.setdefault(iv, set()).add(nid) # give new ids when there's duplicates (use deps to select winner) for _id, nids in used.items(): if len(nids) > 1: nids = sorted( nids, key=lambda n: any(d in nids for _, d in deps.get(n, [])) ) for nid in nids[1:]: ids[nid] = next(new_ids) return ids def _find_root(m): try: top_hcons = m.hcon(m.top) return m.labelset_heads(top_hcons.lo)[0] except (KeyError, StopIteration): # try to find top? return None ## Serialization
[docs]def load(fh, single=False): """ Deserialize :class:`Eds` from a file (handle or filename) Args: fh (str, file): input filename or file object single (bool): if `True`, only return the first Xmrs object Returns: a generator of :class:`Eds` objects (unless the *single* option is `True`) """ if isinstance(fh, stringtypes): s = open(fh, 'r').read() else: s = fh.read() return loads(s, single=single)
[docs]def loads(s, single=False): """ Deserialize :class:`Eds` string representations Args: s (str): Eds string single (bool): if `True`, only return the first Xmrs object Returns: a generator of :class:`Eds` objects (unless the *single* option is `True`) """ es = deserialize(s) if single: return next(es) return es
[docs]def dump(destination, ms, single=False, properties=False, pretty_print=True, show_status=False, predicate_modifiers=False, **kwargs): """ Serialize Xmrs objects to Eds and write to a file Args: destination: filename or file object where data will be written ms: an iterator of :class:`~delphin.mrs.xmrs.Xmrs` objects to serialize (unless the *single* option is `True`) single (bool): if `True`, treat *ms* as a single :class:`~delphin.mrs.xmrs.Xmrs` object instead of as an iterator properties (bool): if `False`, suppress variable properties pretty_print (bool): if `True`, add newlines and indentation show_status (bool): if `True`, annotate disconnected graphs and nodes """ text = dumps(ms, single=single, properties=properties, pretty_print=pretty_print, show_status=show_status, predicate_modifiers=predicate_modifiers, **kwargs) if hasattr(destination, 'write'): print(text, file=destination) else: with open(destination, 'w') as fh: print(text, file=fh)
[docs]def dumps(ms, single=False, properties=False, pretty_print=True, show_status=False, predicate_modifiers=False, **kwargs): """ Serialize an Xmrs object to a Eds representation Args: ms: an iterator of :class:`~delphin.mrs.xmrs.Xmrs` objects to serialize (unless the *single* option is `True`) single (bool): if `True`, treat *ms* as a single :class:`~delphin.mrs.xmrs.Xmrs` object instead of as an iterator properties (bool): if `False`, suppress variable properties pretty_print (bool): if `True`, add newlines and indentation show_status (bool): if `True`, annotate disconnected graphs and nodes Returns: an :class:`Eds` string representation of a corpus of Xmrs """ if not pretty_print and kwargs.get('indent'): pretty_print = True if single: ms = [ms] return serialize( ms, properties=properties, pretty_print=pretty_print, show_status=show_status, predicate_modifiers=predicate_modifiers, **kwargs )
def load_one(fh, **kwargs): return load(fh, single=True, **kwargs) def loads_one(s, **kwargs): return loads(s, single=True, **kwargs) def dump_one(fh, m, **kwargs): dump(fh, m, single=True, **kwargs) def dumps_one(m, **kwargs): return dumps(m, single=True, **kwargs) def _make_eds(d): top, data = d nodes, edges = [], [] for node, _edges in data: nodes.append(node) edges.extend(_edges) return Eds(top=top, nodes=nodes, edges=edges) def _make_nodedata(d): nid = d[0] node = Node(nid, pred=d[1], sortinfo=d[4], lnk=d[2], carg=d[3]) edges = [(nid, rarg, tgt) for rarg, tgt in d[5]] return (node, edges) _COLON = regex(r'\s*:\s*', value=Ignore) _COMMA = regex(r',\s*') _SPACES = regex(r'\s+', value=Ignore) _SYMBOL = regex(r'[-+\w]+') _PRED = regex(r'((?!<-?\d|\("|\{|\[|\s).)+', value=Pred.surface_or_abstract) _EDS = nt('EDS', value=_make_eds) _TOP = opt(nt('TOP'), default=None) _TOPID = opt(_SYMBOL, default=None) _FLAG = opt(regex(r'\s*\(fragmented\)', value=Ignore)) _NODE = nt('NODE', value=_make_nodedata) _DSCN = opt(lit('|', value=Ignore)) _LNK = opt(nt('LNK', value=lambda d: Lnk.charspan(*d)), default=None) _CARG = opt(nt('CARG'), default=None) _PROPS = opt(nt('PROPS', value=lambda d: d[0] + d[1]), default=None) _EDGES = nt('EDGES') _TYPE = opt(_SYMBOL, value=lambda i: [(CVARSORT, i)], default=[]) _AVLIST = nt('AVLIST') _ATTRVAL = nt('ATTRVAL') _PROPLIST= opt(seq(_SPACES, _AVLIST, value=lambda d: d[0]), default=[]) _eds_parser = Peg( grammar=dict( start=delimited(_EDS, Spacing), EDS=bounded(regex(r'\{\s*'), seq(_TOP, nt('NODES')), regex(r'\s*\}')), TOP=seq(_TOPID, _COLON, _FLAG, Spacing, value=lambda d: d[0]), NODES=delimited(_NODE, Spacing), NODE=seq(_DSCN, _SYMBOL, _COLON, _PRED, _LNK, _CARG, _PROPS, _EDGES), LNK=bounded(lit('<'), seq(Integer, _COLON, Integer), lit('>')), CARG=bounded(lit('('), DQString, lit(')')), PROPS=bounded(lit('{'), seq(_TYPE, _PROPLIST), lit('}')), EDGES=bounded(lit('['), _AVLIST, lit(']')), AVLIST=delimited(_ATTRVAL, _COMMA), ATTRVAL=seq(_SYMBOL, _SPACES, _SYMBOL) ) ) def deserialize(s): for e in _eds_parser.parse(s).data: yield e eds = '{{{top}{flag}{delim}{ed_list}{enddelim}}}' ed = '{membership}{id}:{pred}{lnk}{carg}{props}[{dep_list}]' carg = '("{constant}")' proplist = '{{{varsort}{proplist}}}' dep = '{argname} {value}' def serialize(ms, properties=False, pretty_print=True, show_status=False, predicate_modifiers=False, **kwargs): delim = '\n' if pretty_print else ' ' return delim.join( _serialize_eds( m, properties, pretty_print, show_status, predicate_modifiers ) for m in ms ) def _serialize_eds(e, properties, pretty_print, show_status, predicate_modifiers): if not isinstance(e, Eds): e = Eds.from_xmrs(e, predicate_modifiers=predicate_modifiers) # do something predictable for empty EDS if len(e.nodeids()) == 0: return '{:\n}' if pretty_print else '{:}' # determine if graph is connected g = {n: set() for n in e.nodeids()} for n in e.nodeids(): for tgt in e.edges(n).values(): g[n].add(tgt) g[tgt].add(n) nidgrp = _bfs(g, start=e.top) status = '' if show_status and nidgrp != set(e.nodeids()): status = ' (fragmented)' delim = '\n' if pretty_print else ' ' connected = ' ' if pretty_print else '' disconnected = '|' if show_status else ' ' return eds.format( top=e.top + ':' if e.top is not None else ':', flag=status, delim=delim, ed_list=delim.join( ed.format( membership=connected if n.nodeid in nidgrp else disconnected, id=n.nodeid, pred=n.pred.short_form(), lnk=str(n.lnk) if n.lnk else '', carg=carg.format(constant=n.carg) if n.carg else '', props=proplist.format( varsort=n.cvarsort + ' ' if n.cvarsort else '', proplist=', '.join( '{} {}'.format(k, v) for k, v in sorted(n.properties.items()) ) ) if properties and n.sortinfo else '', dep_list=(', '.join( '{} {}'.format(rargname, tgt) for rargname, tgt in sorted( e.edges(n.nodeid).items(), key=lambda x: rargname_sortkey(x[0]) ) )), ) for n in e.nodes() ), enddelim='\n' if pretty_print else '' )