Source code for delphin.codecs.eds

"""
Serialization functions for the "native" EDS format.
"""

from pathlib import Path

from delphin import variable
from delphin.eds import EDS, EDSSyntaxError, Node
from delphin.lnk import Lnk
from delphin.sembase import property_priority, role_priority
from delphin.util import Lexer, _bfs

CODEC_INFO = {
    'representation': 'eds',
}


[docs] def load(source): """ Deserialize an EDS file (handle or filename) to EDS objects Args: source: filename or file object Returns: a list of EDS objects """ if hasattr(source, 'read'): data = list(_decode(source)) else: source = Path(source).expanduser() with source.open() as fh: data = list(_decode(fh)) return data
[docs] def loads(s): """ Deserialize an EDS string to EDS objects Args: s (str): an EDS string Returns: a list of EDS objects """ data = list(_decode(s.splitlines())) return data
[docs] def dump(es, destination, properties=True, lnk=True, show_status=False, indent=True, encoding='utf-8'): """Serialize EDS objects to an EDS file. Args: destination: filename or file object es: iterator of :class:`~delphin.eds.EDS` objects to serialize properties: if `True`, encode variable properties lnk: if `False`, suppress surface alignments and strings show_status (bool): if `True`, indicate disconnected components indent: if `True` or a positive integer, format with newlines and indentation; if `0`, `False` or `None`, don't indent encoding (str): if *destination* is a filename, write to the file with the given encoding; otherwise it is ignored """ string = dumps(es, properties=properties, lnk=lnk, show_status=show_status, indent=indent) if hasattr(destination, 'write'): print(string, file=destination) else: destination = Path(destination).expanduser() with destination.open('w', encoding=encoding) as fh: print(string, file=fh)
[docs] def dumps(es, properties=True, lnk=True, show_status=False, indent=True): """ Serialize EDS objects to an EDS string. Args: es: iterator of :class:`~delphin.eds.EDS` objects to serialize properties: if `True`, encode variable properties lnk: if `False`, suppress surface alignments and strings show_status (bool): if `True`, indicate disconnected components indent: if `True` or a positive integer, format with newlines and indentation; if `0`, `False` or `None`, don't indent Returns: an EDS-serialization of the EDS objects """ if indent is None or indent is False: delim = ' ' else: delim = '\n\n' return delim.join( encode(e, properties=properties, lnk=lnk, show_status=show_status, indent=indent) for e in es)
[docs] def decode(s): """ Deserialize an EDS object from an EDS string. """ lexer = _EDSLexer.lex(s.splitlines()) return _decode_eds(lexer)
[docs] def encode(e, properties=True, lnk=True, show_status=False, indent=True): """ Serialize an EDS object to an EDS string. Args: e: an EDS object properties (bool): if `False`, suppress variable properties lnk: if `False`, suppress surface alignments and strings show_status (bool): if `True`, indicate disconnected components indent: if `True` or a positive integer, format with newlines and indentation; if `0`, `False` or `None`, don't indent Returns: an EDS-serialization of the EDS object """ if indent is None or indent is False: indent = False else: indent = True return _encode_eds(e, properties, lnk, show_status, indent)
############################################################################## ############################################################################## # Decoding _EDSLexer = Lexer( tokens=[ (r'\#([^\s\{]+)\s*(?=\{|$)', 'IDENTIFIER'), (r'\{', 'LBRACE:{'), (r'\}', 'RBRACE:}'), (r'\((?:cyclic *)?(?:fragmented)?\)', 'GRAPHSTATUS'), (r'\|', 'NODESTATUS:|'), (r'<(?:-?\d+[:#]-?\d+|@\d+|\d+(?: +\d+)*)>', 'LNK:a lnk value'), (r'\("([^"\\]*(?:\\.[^"\\]*)*)"\)', 'CARG:a string'), (r':', 'COLON::'), (r',', 'COMMA:,'), (r'\[', 'LBRACKET:['), (r'\]', 'RBRACKET:]'), (r'[^ \n:,<\(\[\]\{\}]+', 'SYMBOL:a symbol'), (r'[^\s]', 'UNEXPECTED') ], error_class=EDSSyntaxError) IDENTIFIER = _EDSLexer.tokentypes.IDENTIFIER LBRACE = _EDSLexer.tokentypes.LBRACE RBRACE = _EDSLexer.tokentypes.RBRACE GRAPHSTATUS = _EDSLexer.tokentypes.GRAPHSTATUS NODESTATUS = _EDSLexer.tokentypes.NODESTATUS LNK = _EDSLexer.tokentypes.LNK CARG = _EDSLexer.tokentypes.CARG COLON = _EDSLexer.tokentypes.COLON COMMA = _EDSLexer.tokentypes.COMMA LBRACKET = _EDSLexer.tokentypes.LBRACKET RBRACKET = _EDSLexer.tokentypes.RBRACKET SYMBOL = _EDSLexer.tokentypes.SYMBOL def _decode(lineiter): lexer = _EDSLexer.lex(lineiter) try: while lexer.peek(): yield _decode_eds(lexer) except StopIteration: pass def _decode_eds(lexer): identifier = lexer.accept_type(IDENTIFIER) lexer.expect_type(LBRACE) # after the LBRACE, the following patterns determine the top: # : 1st is COLON -> None # (fragmented) 1st is GRAPHSTATUS -> None # } 1st is RBRACE -> None # | 1st is NODESTATUS -> None # <sym1> : (fragmented) 3rd is GRAPHSTATUS -> <sym1> # <sym1> : | 3rd is NODESTATUS -> <sym1> # <sym1> : <sym2> : 4th is COLON -> <sym1> # <sym1> : <sym2> ... otherwise -> None if lexer.peek()[0] in (COLON, GRAPHSTATUS, RBRACE, NODESTATUS): top = None lexer.accept_type(COLON) lexer.accept_type(GRAPHSTATUS) elif (lexer.peek(2)[0] in (GRAPHSTATUS, NODESTATUS) or lexer.peek(3)[0] == COLON): top, _ = lexer.expect_type(SYMBOL, COLON) lexer.accept_type(GRAPHSTATUS) else: top = None nodes = [] while lexer.peek()[0] != RBRACE: lexer.accept_type(NODESTATUS) start, _ = lexer.expect_type(SYMBOL, COLON) nodes.append(_decode_node(start, lexer)) lexer.expect_type(RBRACE) return EDS(top=top, nodes=nodes, identifier=identifier) def _decode_node(start, lexer): predicate = lexer.expect_type(SYMBOL).lower() lnk = Lnk(lexer.accept_type(LNK)) carg = lexer.accept_type(CARG) nodetype, properties = _decode_properties(start, lexer) edges = _decode_edges(start, lexer) return Node(start, predicate, nodetype, edges, properties, carg, lnk) def _decode_properties(start, lexer): nodetype = None properties = {} if lexer.accept_type(LBRACE): nodetype = lexer.expect_type(SYMBOL) if lexer.peek()[0] != RBRACE: while True: prop, val = lexer.expect_type(SYMBOL, SYMBOL) properties[prop.upper()] = val.lower() if not lexer.accept_type(COMMA): break lexer.expect_type(RBRACE) return nodetype, properties def _decode_edges(start, lexer): edges = {} lexer.expect_type(LBRACKET) if lexer.peek()[0] != RBRACKET: while True: role, end = lexer.expect_type(SYMBOL, SYMBOL) edges[role.upper()] = end if not lexer.accept_type(COMMA): break lexer.expect_type(RBRACKET) return edges ############################################################################## ############################################################################## # Encoding def _encode_eds(e, properties, lnk, show_status, indent): start = '{' if e.identifier: start = f'#{e.identifier}' + ('\n' if indent else ' ') + '{' end = '\n}' if indent else '}' # do something predictable for empty EDS if len(e.nodes) == 0: return start + end delim = '\n' if indent else ' ' connected = ' ' if indent else '' disconnected = '|' if show_status else ' ' # determine if graph is connected g = {node.id: set() for node in e.nodes} for node in e.nodes: for target in node.edges.values(): g[node.id].add(target) g[target].add(node.id) nidgrp = _bfs(g, start=e.top) top_parts = [] if e.top is not None: top_parts.append(e.top + ':') if show_status and nidgrp != set(g): top_parts.append('(fragmented)') parts = [] if top_parts or indent: parts.append(' '.join(top_parts)) for node in e.nodes: membership = connected if node.id in nidgrp else disconnected parts.append(membership + _encode_node(node, properties, lnk)) return start + delim.join(parts) + end def _encode_node(node, properties, lnk): parts = [node.id, ':', node.predicate] if lnk and node.lnk: parts.append(str(node.lnk)) if node.carg is not None: parts.append('("{}")'.format(node.carg)) if properties and (node.properties or node.type): parts.append('{') parts.append(node.type or variable.UNSPECIFIC) if node.properties: proplist = ['{} {}'.format(prop, node.properties[prop]) for prop in sorted(node.properties, key=property_priority)] parts.append(' ' + ', '.join(proplist)) parts.append('}') parts.append('[') edgelist = [] edges = node.edges for role in sorted(edges, key=role_priority): edgelist.append('{} {}'.format(role, edges[role])) parts.append(', '.join(edgelist)) parts.append(']') return ''.join(parts)