Source code for delphin.codecs.dmrspenman

# -*- coding: utf-8 -*-

"""
DMRS-PENMAN serialization and deserialization.
"""

from pathlib import Path
import logging

import penman

from delphin.exceptions import PyDelphinException
from delphin.lnk import Lnk
from delphin.dmrs import DMRS, Node, Link, CVARSORT
from delphin.dmrs._dmrs import FIRST_NODE_ID
from delphin.sembase import property_priority
from delphin.util import _bfs


logger = logging.getLogger(__name__)


CODEC_INFO = {
    'representation': 'dmrs',
}


[docs]def load(source): """ Deserialize PENMAN graphs from a file (handle or filename) Args: source: filename or file object Returns: a list of DMRS objects """ if not hasattr(source, 'read'): source = Path(source).expanduser() try: graphs = penman.load(source) except penman.PenmanError as exc: raise PyDelphinException('could not decode with Penman') from exc xs = [from_triples(g.triples) for g in graphs] return xs
[docs]def loads(s): """ Deserialize PENMAN graphs from a string Args: s (str): serialized PENMAN graphs Returns: a list of DMRS objects """ try: graphs = penman.loads(s) except penman.PenmanError as exc: raise PyDelphinException('could not decode with Penman') from exc xs = [from_triples(g.triples) for g in graphs] return xs
[docs]def dump(ds, destination, properties=False, lnk=True, indent=False, encoding='utf-8'): """ Serialize DMRS objects to a PENMAN file. Args: destination: filename or file object ds: iterator of :class:`~delphin.mrs.dmrs.DMRS` objects to serialize properties: if `True`, encode variable properties lnk: if `False`, suppress surface alignments and strings indent: if `True`, adaptively indent; if `False` or `None`, don't indent; if a non-negative integer N, indent N spaces per level encoding (str): if *destination* is a filename, write to the file with the given encoding; otherwise it is ignored """ text = dumps(ds, properties=properties, lnk=lnk, indent=indent) if hasattr(destination, 'write'): print(text, file=destination) else: destination = Path(destination).expanduser() with destination.open('w', encoding=encoding) as fh: print(text, file=fh)
[docs]def dumps(ds, properties=False, lnk=True, indent=False): """ Serialize DMRS objects to a PENMAN string. Args: ds: iterator of :class:`~delphin.mrs.dmrs.DMRS` objects to serialize properties: if `True`, encode variable properties lnk: if `False`, suppress surface alignments and strings indent: if `True`, adaptively indent; if `False` or `None`, don't indent; if a non-negative integer N, indent N spaces per level Returns: a PENMAN-serialization of the DMRS objects """ if indent is True: indent = -1 elif indent is False: indent = None to_graph = penman.Graph graphs = [to_graph(to_triples(d, properties=properties, lnk=lnk)) for d in ds] try: return penman.dumps(graphs, indent=indent) except penman.PenmanError as exc: raise PyDelphinException('could not decode with Penman') from exc
[docs]def decode(s): """ Deserialize a DMRS object from a PENMAN string. """ try: g = penman.decode(s) except penman.PenmanError as exc: raise PyDelphinException('could not decode with Penman') from exc return from_triples(g.triples)
[docs]def encode(d, properties=True, lnk=True, indent=False): """ Serialize a DMRS object to a PENMAN string. Args: d: a DMRS object properties (bool): if `False`, suppress variable properties lnk: if `False`, suppress surface alignments and strings indent (bool, int): if `True` or an integer value, add newlines and indentation Returns: a PENMAN-serialization of the DMRS object """ if indent is True: indent = -1 elif indent is False: indent = None triples = to_triples(d, properties=properties, lnk=lnk) g = penman.Graph(triples) try: return penman.encode(g, indent=indent) except penman.PenmanError as exc: raise PyDelphinException('could not decode with Penman') from exc
[docs]def to_triples(d, properties=True, lnk=True): """ Encode *d* as triples suitable for PENMAN serialization. """ # determine if graph is connected g = {node.id: set() for node in d.nodes} for link in d.links: g[link.start].add(link.end) g[link.end].add(link.start) main_component = _bfs(g, start=d.top) complete = True idmap = {} quantifiers = {node.id for node in d.nodes if d.is_quantifier(node.id)} for i, node in enumerate(d.nodes, 1): if node.id in quantifiers: idmap[node.id] = 'q' + str(i) else: idmap[node.id] = '{}{}'.format(node.type or '_', i) # sort the nodes so the top node appears first nodes = sorted(d.nodes, key=lambda n: d.top != n.id) triples = [] for node in nodes: if node.id in main_component: _id = idmap[node.id] triples.append((_id, ':instance', node.predicate)) if lnk and node.lnk is not None: triples.append((_id, ':lnk', '"{}"'.format(str(node.lnk)))) if node.carg is not None: triples.append((_id, ':carg', '"{}"'.format(node.carg))) if node.type: triples.append((_id, ':' + CVARSORT, node.type)) if properties: for key in sorted(node.properties, key=property_priority): value = node.properties[key] triples.append((_id, ':' + key.lower(), value)) else: complete = False # if d.top is not None: # triples.append((None, 'top', d.top)) for link in d.links: if link.start in main_component and link.end in main_component: start = idmap[link.start] end = idmap[link.end] relation = ':{}-{}'.format(link.role.upper(), link.post) triples.append((start, relation, end)) if not complete: logger.warning( 'disconnected graph cannot be completely encoded: %r', d) return triples
[docs]def from_triples(triples): """ Decode triples, as from :func:`to_triples`, into a DMRS object. """ top = lnk = surface = identifier = None nids, nd, edges = [], {}, [] for src, rel, tgt in triples: rel = rel.lstrip(':') src, tgt = str(src), str(tgt) # in case penman converts ids to ints if src is None and rel == 'top': top = tgt continue elif src not in nd: if top is None: top = src nids.append(src) nd[src] = {'pred': None, 'lnk': None, 'type': None, 'props': {}, 'carg': None} if rel == 'instance': nd[src]['pred'] = tgt elif rel == 'lnk': cfrom, cto = tgt.strip('"<>').split(':') nd[src]['lnk'] = Lnk.charspan(int(cfrom), int(cto)) elif rel == 'carg': if (tgt[0], tgt[-1]) == ('"', '"'): tgt = tgt[1:-1] nd[src]['carg'] = tgt elif rel == CVARSORT: nd[src]['type'] = tgt elif rel.islower(): nd[src]['props'][rel] = tgt else: rargname, post = rel.rsplit('-', 1) edges.append((src, tgt, rargname, post)) nidmap = dict((nid, FIRST_NODE_ID + i) for i, nid in enumerate(nids)) nodes = [ Node(id=nidmap[nid], predicate=nd[nid]['pred'], type=nd[nid]['type'], properties=nd[nid]['props'], lnk=nd[nid]['lnk'], carg=nd[nid]['carg']) for i, nid in enumerate(nids) ] links = [Link(nidmap[s], nidmap[t], r, p) for s, t, r, p in edges] return DMRS( top=nidmap[top], nodes=nodes, links=links, lnk=lnk, surface=surface, identifier=identifier )