Source code for delphin.interfaces.ace


"""
An interface for the ACE processor.

This module provides classes and functions for managing interactive
communication with an open
`ACE <http://sweaglesw.org/linguistics/ace/>`_ process.

Note:
  ACE is required for the functionality in this module, but it is not
  included with PyDelphin. Pre-compiled binaries are available for
  Linux and MacOS: http://sweaglesw.org/linguistics/ace/

  For installation instructions, see:
  http://moin.delph-in.net/AceInstall

The :class:`AceParser`, :class:`AceTransferer`, and
:class:`AceGenerator` classes are used for parsing, transferring, and
generating with ACE. All are subclasses of :class:`AceProcess`, which
connects to ACE in the background, sends it data via its stdin, and
receives responses via its stdout. Responses from ACE are interpreted
so the data is more accessible in Python.

Warning:
  Instantiating :class:`AceParser`, :class:`AceTransferer`, or
  :class:`AceGenerator` opens ACE in a subprocess, so take care to
  close the process (:meth:`AceProcess.close`) when finished or,
  alternatively, instantiate the class in a context manager.

Interpreted responses are stored in a dictionary-like
:class:`~delphin.interfaces.base.ParseResponse` object. When queried
as a dictionary, these objects return the raw response strings. When
queried via its methods, the PyDelphin models of the data are returned.
The response objects may contain a number of
:class:`~delphin.interfaces.ParseResult` objects. These objects
similarly provide raw-string access via dictionary keys and
PyDelphin-model access via methods. Here is an example of parsing a
sentence with :class:`AceParser`:

    >>> with AceParser('erg-1214-x86-64-0.9.24.dat') as parser:
    ...     response = parser.interact('Cats sleep.')
    ...     print(response.result(0)['mrs'])
    ...     print(response.result(0).mrs())
    ... 
    [ LTOP: h0 INDEX: e2 [ e SF: prop TENSE: pres MOOD: indicative PROG: - PERF: - ] RELS: < [ udef_q<0:4> LBL: h4 ARG0: x3 [ x PERS: 3 NUM: pl IND: + ] RSTR: h5 BODY: h6 ]  [ _cat_n_1<0:4> LBL: h7 ARG0: x3 ]  [ _sleep_v_1<5:11> LBL: h1 ARG0: e2 ARG1: x3 ] > HCONS: < h0 qeq h1 h5 qeq h7 > ]
    <Xmrs object (udef cat sleep) at 139880862399696>

Functions exist for non-interactive communication with ACE:
:func:`parse` and :func:`parse_from_iterable` open and close an
:class:`AceParser` instance; :func:`transfer` and
:func:`transfer_from_iterable` open and close an :class:`AceTransferer`
instance; and :func:`generate` and :func:`generate_from_iterable` open
and close an :class:`AceGenerator` instance. Note that these functions
open a new ACE subprocess every time they are called, so if you have
many items to process, it is more efficient to use
:func:`parse_from_iterable`, :func:`transfer_from_iterable`, or
:func:`generate_from_iterable` than the single-item versions, or to
interact with the :class:`AceProcess` subclass instances directly.

"""

import logging
import os
import argparse
import re
from subprocess import (
    check_call,
    check_output,
    CalledProcessError,
    Popen,
    PIPE
)
from platform import platform   # portable system information
from getpass import getuser     # portable way to get username
from socket import gethostname  # portable way to get host name
from datetime import datetime
import locale; locale.setlocale(locale.LC_ALL, '')
encoding = locale.getpreferredencoding(False)

from delphin.interfaces.base import ParseResponse, Processor
from delphin.util import SExpr, stringtypes
from delphin.__about__ import __version__ as pydelphin_version
from delphin.exceptions import PyDelphinException


class AceProcessError(PyDelphinException):
    """Raised when the ACE process has crashed and cannot be recovered."""


[docs]class AceProcess(Processor): """ The base class for interfacing ACE. This manages most subprocess communication with ACE, but does not interpret the response returned via ACE's stdout. Subclasses override the :meth:`receive` method to interpret the task-specific response formats. Args: grm (str): path to a compiled grammar image cmdargs (list, optional): a list of command-line arguments for ACE; note that arguments and their values should be separate entries, e.g. `['-n', '5']` executable (str, optional): the path to the ACE binary; if `None`, ACE is assumed to be callable via `ace` env (dict): environment variables to pass to the ACE subprocess tsdbinfo (bool): if `True` and ACE's version is compatible, all information ACE reports for [incr tsdb()] processing is gathered and returned in the response """ #: The name of the task performed by the processor (`'parse'`, #: `'transfer'`, or `'generate'`). This is useful when a function, #: such as :meth:`delphin.itsdb.TestSuite.process`, accepts any #: :class:`AceProcess` instance. task = None _cmdargs = [] _termini = [] def __init__(self, grm, cmdargs=None, executable=None, env=None, tsdbinfo=True, **kwargs): if not os.path.isfile(grm): raise ValueError("Grammar file %s does not exist." % grm) self.grm = grm self.cmdargs = cmdargs or [] # validate the arguments _ace_argparser.parse_args(self.cmdargs) self.executable = executable or 'ace' ace_version = self.ace_version if ace_version >= (0, 9, 14): self.cmdargs.append('--tsdb-notes') if tsdbinfo and ace_version >= (0, 9, 24): self.cmdargs.extend(['--tsdb-stdout', '--report-labels']) self.receive = self._tsdb_receive else: self.receive = self._default_receive self.env = env or os.environ self._run_id = -1 self.run_infos = [] self._open() @property def ace_version(self): """The version of the specified ACE binary.""" return _ace_version(self.executable) @property def run_info(self): """Contextual information about the the running process.""" return self.run_infos[-1] def _open(self): self._p = Popen( [self.executable, '-g', self.grm] + self._cmdargs + self.cmdargs, stdin=PIPE, stdout=PIPE, env=self.env, universal_newlines=True ) self._run_id += 1 self.run_infos.append({ 'run-id': self._run_id, 'application': 'ACE {} via PyDelphin v{}'.format( '.'.join(map(str, self.ace_version)), pydelphin_version), 'environment': ' '.join(self.cmdargs), 'user': getuser(), 'host': gethostname(), 'os': platform(), 'start': datetime.now() }) if self._p.poll() is not None and self._p.returncode != 0: raise AceProcessError('Process closed on startup; see <stderr>.') def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close() return False # don't try to handle any exceptions def _result_lines(self, termini=None): poll = self._p.poll next_line = self._p.stdout.readline if termini is None: termini = self._termini i, end = 0, len(termini) cur_terminus = termini[i] lines = [] while i < end: s = next_line() if s == '' and poll() != None: logging.info( 'Process closed unexpectedly; attempting to reopen' ) self.close() self._open() break # The 'run' note should appear when the process is opened, but # handle it here to avoid potential deadlocks if it gets buffered elif s.startswith('NOTE: tsdb run:'): self._read_run_info(s.rstrip()) # the rest should be normal result lines else: lines.append(s.rstrip()) if cur_terminus.search(s): i += 1 return [line for line in lines if line != ''] def _read_run_info(self, line): assert line.startswith('NOTE: tsdb run:') for key, value in _sexpr_data(line[15:].lstrip()): if key == ':application': continue # PyDelphin sets 'application' self.run_info[key.lstrip(':')] = value
[docs] def send(self, datum): """ Send *datum* (e.g. a sentence or MRS) to ACE. Warning: Sending data without reading (e.g., via :meth:`receive`) can fill the buffer and cause data to be lost. Use the :meth:`interact` method for most data-processing tasks with ACE. """ try: self._p.stdin.write((datum.rstrip() + '\n')) self._p.stdin.flush() except (IOError, OSError): # ValueError if file was closed manually logging.info( 'Attempted to write to a closed process; attempting to reopen' ) self._open() self._p.stdin.write((datum.rstrip() + '\n')) self._p.stdin.flush()
[docs] def receive(self): """ Return the stdout response from ACE. Warning: Reading beyond the last line of stdout from ACE can cause the process to hang while it waits for the next line. Use the :meth:`interact` method for most data-processing tasks with ACE. """ raise NotImplementedError()
def _default_receive(self): raise NotImplementedError() def _tsdb_receive(self): lines = self._result_lines() response, lines = _make_response(lines, self.run_info) line = ' '.join(lines) # ACE 0.9.24 on Mac puts superfluous newlines response = _tsdb_response(response, line) return response
[docs] def interact(self, datum): """ Send *datum* to ACE and return the response. This is the recommended method for sending and receiving data to/from an ACE process as it reduces the chances of over-filling or reading past the end of the buffer. It also performs a simple validation of the input to help ensure that one complete item is processed at a time. If input item identifiers need to be tracked throughout processing, see :meth:`process_item`. Args: datum (str): the input sentence or MRS Returns: :class:`~delphin.interfaces.ParseResponse` """ validated = self._validate_input(datum) if validated: self.send(validated) result = self.receive() else: result, lines = _make_response( [('NOTE: PyDelphin could not validate the input and ' 'refused to send it to ACE'), 'SKIP: {}'.format(datum)], self.run_info) result['input'] = datum return result
[docs] def process_item(self, datum, keys=None): """ Send *datum* to ACE and return the response with context. The *keys* parameter can be used to track item identifiers through an ACE interaction. If the `task` member is set on the AceProcess instance (or one of its subclasses), it is kept in the response as well. Args: datum (str): the input sentence or MRS keys (dict): a mapping of item identifier names and values Returns: :class:`~delphin.interfaces.ParseResponse` """ response = self.interact(datum) if keys is not None: response['keys'] = keys if 'task' not in response and self.task is not None: response['task'] = self.task return response
[docs] def close(self): """ Close the ACE process and return the process's exit code. """ self.run_info['end'] = datetime.now() self._p.stdin.close() for line in self._p.stdout: if line.startswith('NOTE: tsdb run:'): self._read_run_info(line) else: logging.debug('ACE cleanup: {}'.format(line.rstrip())) retval = self._p.wait() return retval
[docs]class AceParser(AceProcess): """ A class for managing parse requests with ACE. See :class:`AceProcess` for initialization parameters. """ task = 'parse' _termini = [re.compile(r'^$'), re.compile(r'^$')] def _validate_input(self, datum): # valid input for parsing is non-empty # (this relies on an empty string evaluating to False) return datum.strip() def _default_receive(self): lines = self._result_lines() response, lines = _make_response(lines, self.run_info) response['results'] = [ dict(zip(('mrs', 'derivation'), map(str.strip, line.split(' ; ')))) for line in lines ] return response
[docs]class AceTransferer(AceProcess): """ A class for managing transfer requests with ACE. Note that currently the `tsdbinfo` parameter must be set to `False` as ACE is not yet able to provide detailed information for transfer results. See :class:`AceProcess` for initialization parameters. """ task = 'transfer' _termini = [re.compile(r'^$')] def __init__(self, grm, cmdargs=None, executable=None, env=None, tsdbinfo=False, **kwargs): # disallow --tsdb-stdout if tsdbinfo == True: raise ValueError( 'tsdbinfo=True is not available for AceTransferer' ) if '--tsdb-stdout' in (cmdargs or []): cmdargs.remove('--tsdb-stdout') AceProcess.__init__( self, grm, cmdargs=cmdargs, executable=executable, env=env, tsdbinfo=False, **kwargs ) def _validate_input(self, datum): return _possible_mrs(datum) def _default_receive(self): lines = self._result_lines() response, lines = _make_response(lines, self.run_info) response['results'] = [{'mrs': line.strip()} for line in lines] return response
[docs]class AceGenerator(AceProcess): """ A class for managing realization requests with ACE. See :class:`AceProcess` for initialization parameters. """ task = 'generate' _cmdargs = ['-e', '--tsdb-notes'] _termini = [re.compile(r'NOTE: tsdb parse: ')] def _validate_input(self, datum): return _possible_mrs(datum) def _default_receive(self): show_tree = '--show-realization-trees' in self.cmdargs show_mrs = '--show-realization-mrses' in self.cmdargs lines = self._result_lines() response, lines = _make_response(lines, self.run_info) i, numlines = 0, len(lines) results = [] while i < numlines: result = {'SENT': lines[i].strip()} i += 1 if show_tree and lines[i].startswith('DTREE = '): result['derivation'] = lines[i][8:].strip() i += 1 if show_mrs and lines[i].startswith('MRS = '): result['mrs'] = lines[i][6:].strip() i += 1 results.append(result) response['results'] = results return response def _tsdb_receive(self): # with --tsdb-stdout, the notes line is not printed lines = self._result_lines(termini=[re.compile(r'\(:results \.')]) response, lines = _make_response(lines, self.run_info) line = ' '.join(lines) # ACE 0.9.24 on Mac puts superfluous newlines response = _tsdb_response(response, line) return response
[docs]def compile(cfg_path, out_path, executable=None, env=None, log=None): """ Use ACE to compile a grammar. Args: cfg_path (str): the path to the ACE config file out_path (str): the path where the compiled grammar will be written executable (str, optional): the path to the ACE binary; if `None`, the `ace` command will be used env (dict, optional): environment variables to pass to the ACE subprocess log (file, optional): if given, the file, opened for writing, or stream to write ACE's stdout and stderr compile messages """ try: check_call( [(executable or 'ace'), '-g', cfg_path, '-G', out_path], stdout=log, stderr=log, close_fds=True, env=(env or os.environ) ) except (CalledProcessError, OSError): logging.error( 'Failed to compile grammar with ACE. See {}' .format(log.name if log is not None else '<stderr>') ) raise
[docs]def parse_from_iterable(grm, data, **kwargs): """ Parse each sentence in *data* with ACE using grammar *grm*. Args: grm (str): path to a compiled grammar image data (iterable): the sentences to parse **kwargs: additional keyword arguments to pass to the AceParser Yields: :class:`~delphin.interfaces.ParseResponse` Example: >>> sentences = ['Dogs bark.', 'It rained'] >>> responses = list(ace.parse_from_iterable('erg.dat', sentences)) NOTE: parsed 2 / 2 sentences, avg 723k, time 0.01026s """ with AceParser(grm, **kwargs) as parser: for datum in data: yield parser.interact(datum)
[docs]def parse(grm, datum, **kwargs): """ Parse sentence *datum* with ACE using grammar *grm*. Args: grm (str): path to a compiled grammar image datum (str): the sentence to parse **kwargs: additional keyword arguments to pass to the AceParser Returns: :class:`~delphin.interfaces.ParseResponse` Example: >>> response = ace.parse('erg.dat', 'Dogs bark.') NOTE: parsed 1 / 1 sentences, avg 797k, time 0.00707s """ return next(parse_from_iterable(grm, [datum], **kwargs))
[docs]def transfer_from_iterable(grm, data, **kwargs): """ Transfer from each MRS in *data* with ACE using grammar *grm*. Args: grm (str): path to a compiled grammar image data (iterable): source MRSs as SimpleMRS strings **kwargs: additional keyword arguments to pass to the AceTransferer Yields: :class:`~delphin.interfaces.ParseResponse` """ with AceTransferer(grm, **kwargs) as transferer: for datum in data: yield transferer.interact(datum)
[docs]def transfer(grm, datum, **kwargs): """ Transfer from the MRS *datum* with ACE using grammar *grm*. Args: grm (str): path to a compiled grammar image datum: source MRS as a SimpleMRS string **kwargs: additional keyword arguments to pass to the AceTransferer Returns: :class:`~delphin.interfaces.ParseResponse` """ return next(transfer_from_iterable(grm, [datum], **kwargs))
[docs]def generate_from_iterable(grm, data, **kwargs): """ Generate from each MRS in *data* with ACE using grammar *grm*. Args: grm (str): path to a compiled grammar image data (iterable): MRSs as SimpleMRS strings **kwargs: additional keyword arguments to pass to the AceGenerator Yields: :class:`~delphin.interfaces.ParseResponse` """ with AceGenerator(grm, **kwargs) as generator: for datum in data: yield generator.interact(datum)
[docs]def generate(grm, datum, **kwargs): """ Generate from the MRS *datum* with ACE using *grm*. Args: grm (str): path to a compiled grammar image datum: the SimpleMRS string to generate from **kwargs: additional keyword arguments to pass to the AceGenerator Returns: :class:`~delphin.interfaces.ParseResponse` """ return next(generate_from_iterable(grm, [datum], **kwargs))
# The following defines the command-line options available for users to # specify in AceProcess tasks. For a description of these options, see: # http://moin.delph-in.net/AceOptions # thanks: https://stackoverflow.com/a/14728477/1441112 class _ACEArgumentParser(argparse.ArgumentParser): def error(self, message): raise ValueError(message) _ace_argparser = _ACEArgumentParser() _ace_argparser.add_argument('-n', type=int) _ace_argparser.add_argument('-1', action='store_const', const=1, dest='n') _ace_argparser.add_argument('-r') _ace_argparser.add_argument('-p', action='store_true') _ace_argparser.add_argument('-X', action='store_true') _ace_argparser.add_argument('-L', action='store_true') _ace_argparser.add_argument('-y', action='store_true') _ace_argparser.add_argument('--max-chart-megabytes', type=int) _ace_argparser.add_argument('--max-unpack-megabytes', type=int) _ace_argparser.add_argument('--timeout', type=int) _ace_argparser.add_argument('--disable-subsumption-test', action='store_true') _ace_argparser.add_argument('--show-realization-trees', action='store_true') _ace_argparser.add_argument('--show-realization-mrses', action='store_true') _ace_argparser.add_argument('--show-probability', action='store_true') _ace_argparser.add_argument('--disable-generalization', action='store_true') _ace_argparser.add_argument('--ubertagging', nargs='?', type=float) _ace_argparser.add_argument('--pcfg', type=argparse.FileType()) _ace_argparser.add_argument('--rooted-derivations', action='store_true') _ace_argparser.add_argument('--udx', nargs='?', choices=('all',)) _ace_argparser.add_argument('--yy-rules', action='store_true') _ace_argparser.add_argument('--max-words', type=int) def _ace_version(executable): version = (0, 9, 0) # initial public release try: out = check_output([executable, '-V'], universal_newlines=True) version = re.search(r'ACE version ([.0-9]+)', out).group(1) version = tuple(map(int, version.split('.'))) except (CalledProcessError, OSError): logging.error('Failed to get ACE version number.') raise return version def _possible_mrs(s): start, end = -1, -1 depth = 0 for i, c in enumerate(s): if c == '[': if depth == 0: start = i depth += 1 elif c == ']': depth -= 1 if depth == 0: end = i + 1 break # only valid if neither start nor end is -1 # note: this ignores any secondary MRSs on the same line if start != -1 and end != -1: # only log if taking a substring if start != 0 and end != len(s): logging.debug('Possible MRS found at <%d:%d>: %s', start, end, s) s = s[start:end] return s else: return False def _make_response(lines, run): response = ParseResponse({ 'NOTES': [], 'WARNINGS': [], 'ERRORS': [], 'run': run, 'input': None, 'surface': None, 'results': [] }) content_lines = [] for line in lines: if line.startswith('NOTE: '): response['NOTES'].append(line[6:]) elif line.startswith('WARNING: '): response['WARNINGS'].append(line[9:]) elif line.startswith('ERROR: '): response['ERRORS'].append(line[7:]) elif line.startswith('SENT: ') or line.startswith('SKIP: '): response['surface'] = line[6:] else: content_lines.append(line) return response, content_lines def _sexpr_data(line): while line: expr = SExpr.parse(line) if len(expr.data) != 2: logging.error('Malformed output from ACE: {}'.format(line)) break line = expr.remainder.lstrip() yield expr.data def _tsdb_response(response, line): for key, val in _sexpr_data(line): if key == ':p-input': response.setdefault('tokens', {})['initial'] = val.strip() elif key == ':p-tokens': response.setdefault('tokens', {})['internal'] = val.strip() elif key == ':results': for result in val: res = {} for reskey, resval in result: if reskey == ':derivation': res['derivation'] = resval.strip() elif reskey == ':mrs': res['mrs'] = resval.strip() elif reskey == ':surface': res['surface'] = resval.strip() elif isinstance(resval, stringtypes): res[reskey[1:]] = resval.strip() else: res[reskey[1:]] = resval response['results'].append(res) elif isinstance(val, stringtypes): response[key[1:]] = val.strip() else: response[key[1:]] = val return response