Source code for delphin.ace

"""
An interface for the ACE processor.
"""

import argparse
import locale
import logging
import os
import re
from collections.abc import Iterable, Iterator, Mapping
from datetime import datetime
from getpass import getuser  # portable way to get username
from pathlib import Path
from platform import platform  # portable system information
from re import Pattern
from socket import gethostname  # portable way to get host name
from subprocess import (
    PIPE,
    CalledProcessError,
    Popen,
    check_call,
    check_output,
)
from typing import (
    IO,
    Any,
    ClassVar,
)

from delphin import interface, util

# Default modules need to import the PyDelphin version
from delphin.__about__ import __version__
from delphin.exceptions import PyDelphinException

logger = logging.getLogger(__name__)


# do this right away to avoid some encoding issues
locale.setlocale(locale.LC_ALL, "")
encoding = locale.getpreferredencoding(False)


[docs] class ACEProcessError(PyDelphinException): """Raised when the ACE process has crashed and cannot be recovered."""
[docs] class ACEProcess(interface.Processor): """ The base class for interfacing ACE. This manages most subprocess communication with ACE, but does not interpret the response returned via ACE's stdout. Subclasses override the :meth:`receive` method to interpret the task-specific response formats. Note that not all arguments to this class are used by every subclass; the documentation for each subclass specifies which are available. Args: grm (str): path to a compiled grammar image cmdargs (list, optional): a list of command-line arguments for ACE; note that arguments and their values should be separate entries, e.g. `['-n', '5']` executable (str, optional): the path to the ACE binary; if `None`, ACE is assumed to be callable via `ace` env (dict): environment variables to pass to the ACE subprocess tsdbinfo (bool): if `True` and ACE's version is compatible, all information ACE reports for [incr tsdb()] processing is gathered and returned in the response full_forest (bool): if `True` and *tsdbinfo* is `True`, output the full chart for each parse result stderr (file): stream used for ACE's stderr """ _cmdargs: tuple[str, ...] = () _termini: ClassVar[tuple[Pattern[str], ...]] = () def __init__( self, grm: util.PathLike, cmdargs: list[str] | None = None, executable: util.PathLike | None = None, env: Mapping[str, str] | None = None, tsdbinfo: bool = True, full_forest: bool = False, stderr: IO[Any] | None = None, ): self.grm = str(Path(grm).expanduser()) self.cmdargs = cmdargs or [] # validate the arguments _ace_argparser.parse_args(self.cmdargs) self.executable = "ace" if executable: self.executable = str(Path(executable).expanduser()) ace_version = self.ace_version if ace_version >= (0, 9, 14): self.cmdargs.append("--tsdb-notes") if tsdbinfo and ace_version >= (0, 9, 24): self.cmdargs.extend(["--tsdb-stdout", "--report-labels"]) self.receive = self._tsdb_receive if full_forest: self._cmdargs = (*self._cmdargs, "--itsdb-forest") else: self.receive = self._default_receive self.env = env or os.environ self._run_id = -1 self.run_infos: list[dict[str, Any]] = [] self._stderr = stderr self._open() @property def ace_version(self) -> tuple[int, ...]: """The version of the specified ACE binary.""" return _ace_version(self.executable) @property def run_info(self) -> dict[str, Any]: """Contextual information about the the running process.""" return self.run_infos[-1] def _open(self) -> None: self._p = Popen( [self.executable, "-g", self.grm, *self._cmdargs, *self.cmdargs], stdin=PIPE, stdout=PIPE, stderr=self._stderr, env=self.env, universal_newlines=True, ) self._run_id += 1 self.run_infos.append( { "run-id": self._run_id, "application": "ACE {} via PyDelphin v{}".format( ".".join(map(str, self.ace_version)), __version__ ), "environment": " ".join(self.cmdargs), "user": getuser(), "host": gethostname(), "os": platform(), "start": datetime.now(), } ) if self._p.poll() is not None and self._p.returncode != 0: raise ACEProcessError("ACE process closed on startup") def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close() return False # don't try to handle any exceptions def _result_lines(self, termini: list[Pattern[str]] | None = None) -> list[str]: poll = self._p.poll assert self._p.stdout is not None, "cannot receive output from ACE" next_line = self._p.stdout.readline if termini is None: termini = list(self._termini) i, end = 0, len(termini) cur_terminus = termini[i] lines = [] while i < end: s = next_line() if s == "" and poll() is not None: logger.info("Process closed unexpectedly; giving up.") self.close() break # The 'run' note should appear when the process is opened, but # handle it here to avoid potential deadlocks if it gets buffered elif s.startswith("NOTE: tsdb run:"): self._read_run_info(s.rstrip()) # the rest should be normal result lines else: lines.append(s.rstrip()) if cur_terminus.search(s): i += 1 return [line for line in lines if line != ""] def _read_run_info(self, line: str) -> None: assert line.startswith("NOTE: tsdb run:") for key, value in _sexpr_data(line[15:].lstrip()): if key == ":application": continue # PyDelphin sets 'application' self.run_info[key.lstrip(":")] = value
[docs] def send(self, datum: str) -> None: """ Send *datum* (e.g. a sentence or MRS) to ACE. Warning: Sending data without reading (e.g., via :meth:`receive`) can fill the buffer and cause data to be lost. Use the :meth:`interact` method for most data-processing tasks with ACE. """ assert self._p.stdin is not None, "cannot send inputs to ACE" try: self._p.stdin.write(datum.rstrip() + "\n") self._p.stdin.flush() except OSError: # ValueError if file was closed manually logger.info("Attempted to write to a closed process; attempting to reopen") self._open() self._p.stdin.write(datum.rstrip() + "\n") self._p.stdin.flush()
[docs] def receive(self) -> interface.Response: """ Return the stdout response from ACE. Warning: Reading beyond the last line of stdout from ACE can cause the process to hang while it waits for the next line. Use the :meth:`interact` method for most data-processing tasks with ACE. """ raise NotImplementedError()
def _default_receive(self) -> interface.Response: raise NotImplementedError() def _tsdb_receive(self) -> interface.Response: lines = self._result_lines() response, lines = _make_response(lines, self.run_info) # now it should be safe to reopen a closed process (if necessary) if self._p.poll() is not None: logger.info("Attempting to restart ACE.") self._open() line = " ".join(lines) # ACE 0.9.24 on Mac puts superfluous newlines response = _tsdb_response(response, line) return response
[docs] def interact(self, datum: str) -> interface.Response: """ Send *datum* to ACE and return the response. This is the recommended method for sending and receiving data to/from an ACE process as it reduces the chances of over-filling or reading past the end of the buffer. It also performs a simple validation of the input to help ensure that one complete item is processed at a time. If input item identifiers need to be tracked throughout processing, see :meth:`process_item`. Args: datum (str): the input sentence or MRS Returns: :class:`~delphin.interface.Response` """ if not isinstance(datum, str): raise TypeError( f"interact() argument must be a string, not {type(datum).__name__!r}" ) validated = self._validate_input(datum) if validated: self.send(validated) result = self.receive() else: result, _ = _make_response( [ ( "NOTE: PyDelphin could not validate the input and " "refused to send it to ACE" ), f"SKIP: {datum}", ], self.run_info, ) result["input"] = datum return result
[docs] def process_item( self, datum: str, keys: dict[str, Any] | None = None ) -> interface.Response: """ Send *datum* to ACE and return the response with context. The *keys* parameter can be used to track item identifiers through an ACE interaction. If the `task` member is set on the ACEProcess instance (or one of its subclasses), it is kept in the response as well. Args: datum (str): the input sentence or MRS keys (dict): a mapping of item identifier names and values Returns: :class:`~delphin.interface.Response` """ response = self.interact(datum) if keys is not None: response["keys"] = keys if "task" not in response and self.task is not None: response["task"] = self.task return response
[docs] def close(self) -> int: """ Close the ACE process and return the process's exit code. """ self.run_info["end"] = datetime.now() if self._p.stdin is not None: self._p.stdin.close() if self._p.stdout is not None: for line in self._p.stdout: if line.startswith("NOTE: tsdb run:"): self._read_run_info(line) else: logger.debug("ACE cleanup: %s", line.rstrip()) retval = self._p.wait() return retval
def _validate_input(self, datum: str) -> str: raise NotImplementedError()
[docs] class ACEParser(ACEProcess): """ A class for managing parse requests with ACE. See :class:`ACEProcess` for initialization parameters. """ task = "parse" _termini = (re.compile(r"^$"), re.compile(r"^$")) def _validate_input(self, datum: str): # valid input for parsing is non-empty # (this relies on an empty string evaluating to False) return isinstance(datum, str) and datum.strip() def _default_receive(self): lines = self._result_lines() response, lines = _make_response(lines, self.run_info) response["results"] = [ dict( zip( ("mrs", "derivation"), map(str.strip, line.split(" ; ")), strict=False, ), ) for line in lines ] return response
[docs] class ACETransferer(ACEProcess): """ A class for managing transfer requests with ACE. See :class:`ACEProcess` for initialization parameters. """ task = "transfer" _termini = (re.compile(r"^$"),) def __init__( self, grm: util.PathLike, cmdargs: list[str] | None = None, executable: util.PathLike | None = None, env: Mapping[str, str] | None = None, stderr: IO[Any] | None = None, ): super().__init__( grm, cmdargs=cmdargs, executable=executable, env=env, tsdbinfo=False, full_forest=False, stderr=stderr, ) def _validate_input(self, datum): return _possible_mrs(datum) def _default_receive(self): lines = self._result_lines() response, lines = _make_response(lines, self.run_info) response["results"] = [{"mrs": line.strip()} for line in lines] return response
[docs] class ACEGenerator(ACEProcess): """ A class for managing realization requests with ACE. See :class:`ACEProcess` for initialization parameters. """ task = "generate" _cmdargs = ("-e", "--tsdb-notes") _termini = (re.compile(r"NOTE: tsdb parse: "),) def __init__( self, grm: util.PathLike, cmdargs: list[str] | None = None, executable: util.PathLike | None = None, env: Mapping[str, str] | None = None, tsdbinfo: bool = True, stderr: IO[Any] | None = None, ): super().__init__( grm, cmdargs=cmdargs, executable=executable, env=env, tsdbinfo=tsdbinfo, full_forest=False, stderr=stderr, ) def _validate_input(self, datum): return _possible_mrs(datum) def _default_receive(self): show_tree = "--show-realization-trees" in self.cmdargs show_mrs = "--show-realization-mrses" in self.cmdargs lines = self._result_lines() response, lines = _make_response(lines, self.run_info) i, numlines = 0, len(lines) results = [] while i < numlines: result = {"SENT": lines[i].strip()} i += 1 if show_tree and lines[i].startswith("DTREE = "): result["derivation"] = lines[i][8:].strip() i += 1 if show_mrs and lines[i].startswith("MRS = "): result["mrs"] = lines[i][6:].strip() i += 1 results.append(result) response["results"] = results return response def _tsdb_receive(self): # with --tsdb-stdout, the notes line is not printed lines = self._result_lines(termini=[re.compile(r"\(:results \.")]) response, lines = _make_response(lines, self.run_info) line = " ".join(lines) # ACE 0.9.24 on Mac puts superfluous newlines response = _tsdb_response(response, line) return response
[docs] def compile( cfg_path: util.PathLike, out_path: util.PathLike, executable: util.PathLike | None = None, env: Mapping[str, str] | None = None, stdout: IO[Any] | None = None, stderr: IO[Any] | None = None, ) -> None: """ Use ACE to compile a grammar. Args: cfg_path (str): the path to the ACE config file out_path (str): the path where the compiled grammar will be written executable (str, optional): the path to the ACE binary; if `None`, the `ace` command will be used env (dict, optional): environment variables to pass to the ACE subprocess stdout (file, optional): stream used for ACE's stdout stderr (file, optional): stream used for ACE's stderr """ cfg_path = str(Path(cfg_path).expanduser()) out_path = str(Path(out_path).expanduser()) try: check_call( [(executable or "ace"), "-g", cfg_path, "-G", out_path], stdout=stdout, stderr=stderr, close_fds=True, env=(env or os.environ), ) except (CalledProcessError, OSError): logger.error( "Failed to compile grammar with ACE. See %s", getattr(stderr, "name", "<stderr>"), ) raise
[docs] def parse_from_iterable( grm: util.PathLike, data: Iterable[str], **kwargs: Any ) -> Iterator[interface.Response]: """ Parse each sentence in *data* with ACE using grammar *grm*. Args: grm (str): path to a compiled grammar image data (iterable): the sentences to parse **kwargs: additional keyword arguments to pass to the ACEParser Yields: :class:`~delphin.interface.Response` Example: >>> sentences = ["Dogs bark.", "It rained"] >>> responses = list(ace.parse_from_iterable("erg.dat", sentences)) NOTE: parsed 2 / 2 sentences, avg 723k, time 0.01026s """ with ACEParser(grm, **kwargs) as parser: for datum in data: yield parser.interact(datum)
[docs] def parse(grm: util.PathLike, datum: str, **kwargs: Any) -> interface.Response: """ Parse sentence *datum* with ACE using grammar *grm*. Args: grm (str): path to a compiled grammar image datum (str): the sentence to parse **kwargs: additional keyword arguments to pass to the ACEParser Returns: :class:`~delphin.interface.Response` Example: >>> response = ace.parse("erg.dat", "Dogs bark.") NOTE: parsed 1 / 1 sentences, avg 797k, time 0.00707s """ return next(parse_from_iterable(grm, [datum], **kwargs))
[docs] def transfer_from_iterable( grm: util.PathLike, data: Iterable[str], **kwargs: Any ) -> Iterator[interface.Response]: """ Transfer from each MRS in *data* with ACE using grammar *grm*. Args: grm (str): path to a compiled grammar image data (iterable): source MRSs as SimpleMRS strings **kwargs: additional keyword arguments to pass to the ACETransferer Yields: :class:`~delphin.interface.Response` """ with ACETransferer(grm, **kwargs) as transferer: for datum in data: yield transferer.interact(datum)
[docs] def transfer(grm: util.PathLike, datum: str, **kwargs: Any) -> interface.Response: """ Transfer from the MRS *datum* with ACE using grammar *grm*. Args: grm (str): path to a compiled grammar image datum: source MRS as a SimpleMRS string **kwargs: additional keyword arguments to pass to the ACETransferer Returns: :class:`~delphin.interface.Response` """ return next(transfer_from_iterable(grm, [datum], **kwargs))
[docs] def generate_from_iterable( grm: util.PathLike, data: Iterable[str], **kwargs: Any ) -> Iterator[interface.Response]: """ Generate from each MRS in *data* with ACE using grammar *grm*. Args: grm (str): path to a compiled grammar image data (iterable): MRSs as SimpleMRS strings **kwargs: additional keyword arguments to pass to the ACEGenerator Yields: :class:`~delphin.interface.Response` """ with ACEGenerator(grm, **kwargs) as generator: for datum in data: yield generator.interact(datum)
[docs] def generate(grm: util.PathLike, datum: str, **kwargs: Any) -> interface.Response: """ Generate from the MRS *datum* with ACE using *grm*. Args: grm (str): path to a compiled grammar image datum: the SimpleMRS string to generate from **kwargs: additional keyword arguments to pass to the ACEGenerator Returns: :class:`~delphin.interface.Response` """ return next(generate_from_iterable(grm, [datum], **kwargs))
# The following defines the command-line options available for users to # specify in ACEProcess tasks. For a description of these options, see: # https://github.com/delph-in/docs/wiki/AceOptions # thanks: https://stackoverflow.com/a/14728477/1441112 class _ACEArgumentParser(argparse.ArgumentParser): def error(self, message): raise ValueError(message) _ace_argparser = _ACEArgumentParser() _ace_argparser.add_argument("-n", type=int) _ace_argparser.add_argument("-1", action="store_const", const=1, dest="n") _ace_argparser.add_argument("-r") _ace_argparser.add_argument("-p", action="store_true") _ace_argparser.add_argument("-X", action="store_true") _ace_argparser.add_argument("-L", action="store_true") _ace_argparser.add_argument("-y", action="store_true") _ace_argparser.add_argument("--max-chart-megabytes", type=int) _ace_argparser.add_argument("--max-unpack-megabytes", type=int) _ace_argparser.add_argument("--timeout", type=int) _ace_argparser.add_argument("--disable-subsumption-test", action="store_true") _ace_argparser.add_argument("--show-realization-trees", action="store_true") _ace_argparser.add_argument("--show-realization-mrses", action="store_true") _ace_argparser.add_argument("--show-probability", action="store_true") _ace_argparser.add_argument("--disable-generalization", action="store_true") _ace_argparser.add_argument("--ubertagging", nargs="?", type=float) _ace_argparser.add_argument("--pcfg", type=argparse.FileType()) _ace_argparser.add_argument("--rooted-derivations", action="store_true") _ace_argparser.add_argument("--udx", nargs="?", choices=("all",)) _ace_argparser.add_argument("--yy-rules", action="store_true") _ace_argparser.add_argument("--max-words", type=int) def _ace_version(executable: str) -> tuple[int, ...]: # 0.9.0 is the initial public release of ACE version: tuple[int, ...] = (0, 9, 0) try: out = check_output([executable, "-V"], universal_newlines=True) except (CalledProcessError, OSError): logger.error("Failed to get ACE version number.") raise else: match = re.search(r"ACE version ([.0-9]+)", out) if match is not None: version = tuple(map(int, match.group(1).split("."))) return version def _possible_mrs(s: str) -> str: start, end = -1, -1 depth = 0 for i, c in enumerate(s): if c == "[": if depth == 0: start = i depth += 1 elif c == "]": depth -= 1 if depth == 0: end = i + 1 break # only valid if neither start nor end is -1 # note: this ignores any secondary MRSs on the same line if start != -1 and end != -1: # only log if taking a substring if start != 0 and end != len(s): logger.debug("Possible MRS found at <%d:%d>: %s", start, end, s) s = s[start:end] return s else: return "" def _make_response(lines, run) -> tuple[interface.Response, list[str]]: response = interface.Response( { "NOTES": [], "WARNINGS": [], "ERRORS": [], "run": run, "input": None, "surface": None, "results": [], } ) content_lines = [] for line in lines: if line.startswith("NOTE: "): response["NOTES"].append(line[6:]) elif line.startswith("WARNING: "): response["WARNINGS"].append(line[9:]) elif line.startswith("ERROR: "): response["ERRORS"].append(line[7:]) elif line.startswith("SENT: ") or line.startswith("SKIP: "): response["surface"] = line[6:] else: content_lines.append(line) return response, content_lines def _sexpr_data(line: str) -> Iterator[tuple[str, Any]]: while line: try: expr = util.SExpr.parse(line) except IndexError: expr = util.SExprResult((":error", "incomplete output from ACE"), "") if len(expr.data) != 2: logger.error("Could not read output from ACE: %s", line) break key, val = expr.data assert isinstance(key, str) yield key, val line = expr.remainder.lstrip() def _tsdb_response(response: interface.Response, line: str) -> interface.Response: for key, val in _sexpr_data(line): if key == ":p-input": response.setdefault("tokens", {})["initial"] = val.strip() elif key == ":p-tokens": response.setdefault("tokens", {})["internal"] = val.strip() elif key == ":results": for result in val: res = {} for reskey, resval in result: if reskey == ":derivation": res["derivation"] = resval.strip() elif reskey == ":mrs": res["mrs"] = resval.strip() elif reskey == ":surface": res["surface"] = resval.strip() elif isinstance(resval, str): res[reskey[1:]] = resval.strip() else: res[reskey[1:]] = resval response["results"].append(res) elif key == ":chart": response["chart"] = chart = [] for edge in val: chart.append({edgekey[1:]: edgeval for edgekey, edgeval in edge}) elif isinstance(val, str): response[key[1:]] = val.strip() else: response[key[1:]] = val return response