"""
An interface for the ACE processor.
"""
import argparse
import locale
import logging
import os
import re
from datetime import datetime
from getpass import getuser # portable way to get username
from pathlib import Path
from platform import platform # portable system information
from socket import gethostname # portable way to get host name
from subprocess import (
PIPE,
CalledProcessError,
Popen,
check_call,
check_output,
)
from typing import (
IO,
Any,
Dict,
Iterable,
Iterator,
List,
Mapping,
Optional,
Pattern,
Tuple,
)
from delphin import interface, util
# Default modules need to import the PyDelphin version
from delphin.__about__ import __version__ # noqa: F401
from delphin.exceptions import PyDelphinException
logger = logging.getLogger(__name__)
# do this right away to avoid some encoding issues
locale.setlocale(locale.LC_ALL, '')
encoding = locale.getpreferredencoding(False)
[docs]
class ACEProcessError(PyDelphinException):
"""Raised when the ACE process has crashed and cannot be recovered."""
[docs]
class ACEProcess(interface.Processor):
"""
The base class for interfacing ACE.
This manages most subprocess communication with ACE, but does not
interpret the response returned via ACE's stdout. Subclasses
override the :meth:`receive` method to interpret the task-specific
response formats.
Note that not all arguments to this class are used by every
subclass; the documentation for each subclass specifies which are
available.
Args:
grm (str): path to a compiled grammar image
cmdargs (list, optional): a list of command-line arguments
for ACE; note that arguments and their values should be
separate entries, e.g. `['-n', '5']`
executable (str, optional): the path to the ACE binary; if
`None`, ACE is assumed to be callable via `ace`
env (dict): environment variables to pass to the ACE
subprocess
tsdbinfo (bool): if `True` and ACE's version is compatible,
all information ACE reports for [incr tsdb()] processing
is gathered and returned in the response
full_forest (bool): if `True` and *tsdbinfo* is `True`, output
the full chart for each parse result
stderr (file): stream used for ACE's stderr
"""
_cmdargs: List[str] = []
_termini: List[Pattern[str]] = []
def __init__(self,
grm: util.PathLike,
cmdargs: Optional[List[str]] = None,
executable: Optional[util.PathLike] = None,
env: Optional[Mapping[str, str]] = None,
tsdbinfo: bool = True,
full_forest: bool = False,
stderr: Optional[IO[Any]] = None):
self.grm = str(Path(grm).expanduser())
self.cmdargs = cmdargs or []
# validate the arguments
_ace_argparser.parse_args(self.cmdargs)
self.executable = 'ace'
if executable:
self.executable = str(Path(executable).expanduser())
ace_version = self.ace_version
if ace_version >= (0, 9, 14):
self.cmdargs.append('--tsdb-notes')
if tsdbinfo and ace_version >= (0, 9, 24):
self.cmdargs.extend(['--tsdb-stdout', '--report-labels'])
self.receive = self._tsdb_receive
if full_forest:
self._cmdargs.append('--itsdb-forest')
else:
self.receive = self._default_receive
self.env = env or os.environ
self._run_id = -1
self.run_infos: List[Dict[str, Any]] = []
self._stderr = stderr
self._open()
@property
def ace_version(self) -> Tuple[int, ...]:
"""The version of the specified ACE binary."""
return _ace_version(self.executable)
@property
def run_info(self) -> Dict[str, Any]:
"""Contextual information about the the running process."""
return self.run_infos[-1]
def _open(self) -> None:
self._p = Popen(
[self.executable, '-g', self.grm] + self._cmdargs + self.cmdargs,
stdin=PIPE,
stdout=PIPE,
stderr=self._stderr,
env=self.env,
universal_newlines=True
)
self._run_id += 1
self.run_infos.append({
'run-id': self._run_id,
'application': 'ACE {} via PyDelphin v{}'.format(
'.'.join(map(str, self.ace_version)), __version__),
'environment': ' '.join(self.cmdargs),
'user': getuser(),
'host': gethostname(),
'os': platform(),
'start': datetime.now()
})
if self._p.poll() is not None and self._p.returncode != 0:
raise ACEProcessError("ACE process closed on startup")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
return False # don't try to handle any exceptions
def _result_lines(
self,
termini: Optional[List[Pattern[str]]] = None
) -> List[str]:
poll = self._p.poll
assert self._p.stdout is not None, 'cannot receive output from ACE'
next_line = self._p.stdout.readline
if termini is None:
termini = self._termini
i, end = 0, len(termini)
cur_terminus = termini[i]
lines = []
while i < end:
s = next_line()
if s == '' and poll() is not None:
logger.info(
'Process closed unexpectedly; giving up.'
)
self.close()
break
# The 'run' note should appear when the process is opened, but
# handle it here to avoid potential deadlocks if it gets buffered
elif s.startswith('NOTE: tsdb run:'):
self._read_run_info(s.rstrip())
# the rest should be normal result lines
else:
lines.append(s.rstrip())
if cur_terminus.search(s):
i += 1
return [line for line in lines if line != '']
def _read_run_info(self, line: str) -> None:
assert line.startswith('NOTE: tsdb run:')
for key, value in _sexpr_data(line[15:].lstrip()):
if key == ':application':
continue # PyDelphin sets 'application'
self.run_info[key.lstrip(':')] = value
[docs]
def send(self, datum: str) -> None:
"""
Send *datum* (e.g. a sentence or MRS) to ACE.
Warning:
Sending data without reading (e.g., via :meth:`receive`) can
fill the buffer and cause data to be lost. Use the
:meth:`interact` method for most data-processing tasks with
ACE.
"""
assert self._p.stdin is not None, 'cannot send inputs to ACE'
try:
self._p.stdin.write((datum.rstrip() + '\n'))
self._p.stdin.flush()
except (IOError, OSError): # ValueError if file was closed manually
logger.info(
'Attempted to write to a closed process; attempting to reopen'
)
self._open()
self._p.stdin.write((datum.rstrip() + '\n'))
self._p.stdin.flush()
[docs]
def receive(self) -> interface.Response:
"""
Return the stdout response from ACE.
Warning:
Reading beyond the last line of stdout from ACE can cause
the process to hang while it waits for the next line. Use
the :meth:`interact` method for most data-processing tasks
with ACE.
"""
raise NotImplementedError()
def _default_receive(self) -> interface.Response:
raise NotImplementedError()
def _tsdb_receive(self) -> interface.Response:
lines = self._result_lines()
response, lines = _make_response(lines, self.run_info)
# now it should be safe to reopen a closed process (if necessary)
if self._p.poll() is not None:
logger.info('Attempting to restart ACE.')
self._open()
line = ' '.join(lines) # ACE 0.9.24 on Mac puts superfluous newlines
response = _tsdb_response(response, line)
return response
[docs]
def interact(self, datum: str) -> interface.Response:
"""
Send *datum* to ACE and return the response.
This is the recommended method for sending and receiving data
to/from an ACE process as it reduces the chances of
over-filling or reading past the end of the buffer. It also
performs a simple validation of the input to help ensure that
one complete item is processed at a time.
If input item identifiers need to be tracked throughout
processing, see :meth:`process_item`.
Args:
datum (str): the input sentence or MRS
Returns:
:class:`~delphin.interface.Response`
"""
if not isinstance(datum, str):
raise TypeError('interact() argument must be a string, '
f'not {type(datum).__name__!r}')
validated = self._validate_input(datum)
if validated:
self.send(validated)
result = self.receive()
else:
result, lines = _make_response(
[('NOTE: PyDelphin could not validate the input and '
'refused to send it to ACE'),
f'SKIP: {datum}'],
self.run_info)
result['input'] = datum
return result
[docs]
def process_item(self,
datum: str,
keys: Optional[Dict[str, Any]] = None
) -> interface.Response:
"""
Send *datum* to ACE and return the response with context.
The *keys* parameter can be used to track item identifiers
through an ACE interaction. If the `task` member is set on
the ACEProcess instance (or one of its subclasses), it is
kept in the response as well.
Args:
datum (str): the input sentence or MRS
keys (dict): a mapping of item identifier names and values
Returns:
:class:`~delphin.interface.Response`
"""
response = self.interact(datum)
if keys is not None:
response['keys'] = keys
if 'task' not in response and self.task is not None:
response['task'] = self.task
return response
[docs]
def close(self) -> int:
"""
Close the ACE process and return the process's exit code.
"""
self.run_info['end'] = datetime.now()
if self._p.stdin is not None:
self._p.stdin.close()
if self._p.stdout is not None:
for line in self._p.stdout:
if line.startswith('NOTE: tsdb run:'):
self._read_run_info(line)
else:
logger.debug('ACE cleanup: %s', line.rstrip())
retval = self._p.wait()
return retval
def _validate_input(self, datum: str) -> str:
raise NotImplementedError()
[docs]
class ACEParser(ACEProcess):
"""
A class for managing parse requests with ACE.
See :class:`ACEProcess` for initialization parameters.
"""
task = 'parse'
_termini = [re.compile(r'^$'), re.compile(r'^$')]
def _validate_input(self, datum: str):
# valid input for parsing is non-empty
# (this relies on an empty string evaluating to False)
return isinstance(datum, str) and datum.strip()
def _default_receive(self):
lines = self._result_lines()
response, lines = _make_response(lines, self.run_info)
response['results'] = [
dict(zip(('mrs', 'derivation'), map(str.strip, line.split(' ; '))))
for line in lines
]
return response
[docs]
class ACETransferer(ACEProcess):
"""
A class for managing transfer requests with ACE.
See :class:`ACEProcess` for initialization parameters.
"""
task = 'transfer'
_termini = [re.compile(r'^$')]
def __init__(self,
grm: util.PathLike,
cmdargs: Optional[List[str]] = None,
executable: Optional[util.PathLike] = None,
env: Optional[Mapping[str, str]] = None,
stderr: Optional[IO[Any]] = None):
super().__init__(grm, cmdargs=cmdargs, executable=executable, env=env,
tsdbinfo=False, full_forest=False, stderr=stderr)
def _validate_input(self, datum):
return _possible_mrs(datum)
def _default_receive(self):
lines = self._result_lines()
response, lines = _make_response(lines, self.run_info)
response['results'] = [{'mrs': line.strip()} for line in lines]
return response
[docs]
class ACEGenerator(ACEProcess):
"""
A class for managing realization requests with ACE.
See :class:`ACEProcess` for initialization parameters.
"""
task = 'generate'
_cmdargs = ['-e', '--tsdb-notes']
_termini = [re.compile(r'NOTE: tsdb parse: ')]
def __init__(self,
grm: util.PathLike,
cmdargs: Optional[List[str]] = None,
executable: Optional[util.PathLike] = None,
env: Optional[Mapping[str, str]] = None,
tsdbinfo: bool = True,
stderr: Optional[IO[Any]] = None):
super().__init__(grm, cmdargs=cmdargs, executable=executable, env=env,
tsdbinfo=tsdbinfo, full_forest=False, stderr=stderr)
def _validate_input(self, datum):
return _possible_mrs(datum)
def _default_receive(self):
show_tree = '--show-realization-trees' in self.cmdargs
show_mrs = '--show-realization-mrses' in self.cmdargs
lines = self._result_lines()
response, lines = _make_response(lines, self.run_info)
i, numlines = 0, len(lines)
results = []
while i < numlines:
result = {'SENT': lines[i].strip()}
i += 1
if show_tree and lines[i].startswith('DTREE = '):
result['derivation'] = lines[i][8:].strip()
i += 1
if show_mrs and lines[i].startswith('MRS = '):
result['mrs'] = lines[i][6:].strip()
i += 1
results.append(result)
response['results'] = results
return response
def _tsdb_receive(self):
# with --tsdb-stdout, the notes line is not printed
lines = self._result_lines(termini=[re.compile(r'\(:results \.')])
response, lines = _make_response(lines, self.run_info)
line = ' '.join(lines) # ACE 0.9.24 on Mac puts superfluous newlines
response = _tsdb_response(response, line)
return response
[docs]
def compile(cfg_path: util.PathLike,
out_path: util.PathLike,
executable: Optional[util.PathLike] = None,
env: Optional[Mapping[str, str]] = None,
stdout: Optional[IO[Any]] = None,
stderr: Optional[IO[Any]] = None) -> None:
"""
Use ACE to compile a grammar.
Args:
cfg_path (str): the path to the ACE config file
out_path (str): the path where the compiled grammar will be
written
executable (str, optional): the path to the ACE binary; if
`None`, the `ace` command will be used
env (dict, optional): environment variables to pass to the ACE
subprocess
stdout (file, optional): stream used for ACE's stdout
stderr (file, optional): stream used for ACE's stderr
"""
cfg_path = str(Path(cfg_path).expanduser())
out_path = str(Path(out_path).expanduser())
try:
check_call(
[(executable or 'ace'), '-g', cfg_path, '-G', out_path],
stdout=stdout, stderr=stderr, close_fds=True,
env=(env or os.environ)
)
except (CalledProcessError, OSError):
logger.error(
'Failed to compile grammar with ACE. See %s',
getattr(stderr, 'name', '<stderr>')
)
raise
[docs]
def parse_from_iterable(
grm: util.PathLike,
data: Iterable[str],
**kwargs: Any) -> Iterator[interface.Response]:
"""
Parse each sentence in *data* with ACE using grammar *grm*.
Args:
grm (str): path to a compiled grammar image
data (iterable): the sentences to parse
**kwargs: additional keyword arguments to pass to the ACEParser
Yields:
:class:`~delphin.interface.Response`
Example:
>>> sentences = ['Dogs bark.', 'It rained']
>>> responses = list(ace.parse_from_iterable('erg.dat', sentences))
NOTE: parsed 2 / 2 sentences, avg 723k, time 0.01026s
"""
with ACEParser(grm, **kwargs) as parser:
for datum in data:
yield parser.interact(datum)
[docs]
def parse(grm: util.PathLike,
datum: str,
**kwargs: Any) -> interface.Response:
"""
Parse sentence *datum* with ACE using grammar *grm*.
Args:
grm (str): path to a compiled grammar image
datum (str): the sentence to parse
**kwargs: additional keyword arguments to pass to the ACEParser
Returns:
:class:`~delphin.interface.Response`
Example:
>>> response = ace.parse('erg.dat', 'Dogs bark.')
NOTE: parsed 1 / 1 sentences, avg 797k, time 0.00707s
"""
return next(parse_from_iterable(grm, [datum], **kwargs))
[docs]
def transfer_from_iterable(
grm: util.PathLike,
data: Iterable[str],
**kwargs: Any) -> Iterator[interface.Response]:
"""
Transfer from each MRS in *data* with ACE using grammar *grm*.
Args:
grm (str): path to a compiled grammar image
data (iterable): source MRSs as SimpleMRS strings
**kwargs: additional keyword arguments to pass to the
ACETransferer
Yields:
:class:`~delphin.interface.Response`
"""
with ACETransferer(grm, **kwargs) as transferer:
for datum in data:
yield transferer.interact(datum)
[docs]
def transfer(grm: util.PathLike,
datum: str,
**kwargs: Any) -> interface.Response:
"""
Transfer from the MRS *datum* with ACE using grammar *grm*.
Args:
grm (str): path to a compiled grammar image
datum: source MRS as a SimpleMRS string
**kwargs: additional keyword arguments to pass to the
ACETransferer
Returns:
:class:`~delphin.interface.Response`
"""
return next(transfer_from_iterable(grm, [datum], **kwargs))
[docs]
def generate_from_iterable(
grm: util.PathLike,
data: Iterable[str],
**kwargs: Any) -> Iterator[interface.Response]:
"""
Generate from each MRS in *data* with ACE using grammar *grm*.
Args:
grm (str): path to a compiled grammar image
data (iterable): MRSs as SimpleMRS strings
**kwargs: additional keyword arguments to pass to the
ACEGenerator
Yields:
:class:`~delphin.interface.Response`
"""
with ACEGenerator(grm, **kwargs) as generator:
for datum in data:
yield generator.interact(datum)
[docs]
def generate(grm: util.PathLike,
datum: str,
**kwargs: Any) -> interface.Response:
"""
Generate from the MRS *datum* with ACE using *grm*.
Args:
grm (str): path to a compiled grammar image
datum: the SimpleMRS string to generate from
**kwargs: additional keyword arguments to pass to the
ACEGenerator
Returns:
:class:`~delphin.interface.Response`
"""
return next(generate_from_iterable(grm, [datum], **kwargs))
# The following defines the command-line options available for users to
# specify in ACEProcess tasks. For a description of these options, see:
# https://github.com/delph-in/docs/wiki/AceOptions
# thanks: https://stackoverflow.com/a/14728477/1441112
class _ACEArgumentParser(argparse.ArgumentParser):
def error(self, message):
raise ValueError(message)
_ace_argparser = _ACEArgumentParser()
_ace_argparser.add_argument('-n', type=int)
_ace_argparser.add_argument('-1', action='store_const', const=1, dest='n')
_ace_argparser.add_argument('-r')
_ace_argparser.add_argument('-p', action='store_true')
_ace_argparser.add_argument('-X', action='store_true')
_ace_argparser.add_argument('-L', action='store_true')
_ace_argparser.add_argument('-y', action='store_true')
_ace_argparser.add_argument('--max-chart-megabytes', type=int)
_ace_argparser.add_argument('--max-unpack-megabytes', type=int)
_ace_argparser.add_argument('--timeout', type=int)
_ace_argparser.add_argument('--disable-subsumption-test', action='store_true')
_ace_argparser.add_argument('--show-realization-trees', action='store_true')
_ace_argparser.add_argument('--show-realization-mrses', action='store_true')
_ace_argparser.add_argument('--show-probability', action='store_true')
_ace_argparser.add_argument('--disable-generalization', action='store_true')
_ace_argparser.add_argument('--ubertagging', nargs='?', type=float)
_ace_argparser.add_argument('--pcfg', type=argparse.FileType())
_ace_argparser.add_argument('--rooted-derivations', action='store_true')
_ace_argparser.add_argument('--udx', nargs='?', choices=('all',))
_ace_argparser.add_argument('--yy-rules', action='store_true')
_ace_argparser.add_argument('--max-words', type=int)
def _ace_version(executable: str) -> Tuple[int, ...]:
# 0.9.0 is the initial public release of ACE
version: Tuple[int, ...] = (0, 9, 0)
try:
out = check_output([executable, '-V'], universal_newlines=True)
except (CalledProcessError, OSError):
logger.error('Failed to get ACE version number.')
raise
else:
match = re.search(r'ACE version ([.0-9]+)', out)
if match is not None:
version = tuple(map(int, match.group(1).split('.')))
return version
def _possible_mrs(s: str) -> str:
start, end = -1, -1
depth = 0
for i, c in enumerate(s):
if c == '[':
if depth == 0:
start = i
depth += 1
elif c == ']':
depth -= 1
if depth == 0:
end = i + 1
break
# only valid if neither start nor end is -1
# note: this ignores any secondary MRSs on the same line
if start != -1 and end != -1:
# only log if taking a substring
if start != 0 and end != len(s):
logger.debug('Possible MRS found at <%d:%d>: %s', start, end, s)
s = s[start:end]
return s
else:
return ''
def _make_response(lines, run) -> Tuple[interface.Response, List[str]]:
response = interface.Response({
'NOTES': [],
'WARNINGS': [],
'ERRORS': [],
'run': run,
'input': None,
'surface': None,
'results': []
})
content_lines = []
for line in lines:
if line.startswith('NOTE: '):
response['NOTES'].append(line[6:])
elif line.startswith('WARNING: '):
response['WARNINGS'].append(line[9:])
elif line.startswith('ERROR: '):
response['ERRORS'].append(line[7:])
elif line.startswith('SENT: ') or line.startswith('SKIP: '):
response['surface'] = line[6:]
else:
content_lines.append(line)
return response, content_lines
def _sexpr_data(line: str) -> Iterator[Tuple[str, Any]]:
while line:
try:
expr = util.SExpr.parse(line)
except IndexError:
expr = util.SExprResult(
(':error', 'incomplete output from ACE'),
'')
if len(expr.data) != 2:
logger.error('Could not read output from ACE: %s', line)
break
key, val = expr.data
assert isinstance(key, str)
yield key, val
line = expr.remainder.lstrip()
def _tsdb_response(response: interface.Response,
line: str) -> interface.Response:
for key, val in _sexpr_data(line):
if key == ':p-input':
response.setdefault('tokens', {})['initial'] = val.strip()
elif key == ':p-tokens':
response.setdefault('tokens', {})['internal'] = val.strip()
elif key == ':results':
for result in val:
res = {}
for reskey, resval in result:
if reskey == ':derivation':
res['derivation'] = resval.strip()
elif reskey == ':mrs':
res['mrs'] = resval.strip()
elif reskey == ':surface':
res['surface'] = resval.strip()
elif isinstance(resval, str):
res[reskey[1:]] = resval.strip()
else:
res[reskey[1:]] = resval
response['results'].append(res)
elif key == ':chart':
response['chart'] = chart = []
for edge in val:
chart.append({edgekey[1:]: edgeval
for edgekey, edgeval in edge})
elif isinstance(val, str):
response[key[1:]] = val.strip()
else:
response[key[1:]] = val
return response