"""
PyDelphin API counterparts to the `delphin` commands.
"""
import sys
from pathlib import Path
import tempfile
import logging
import warnings
from delphin import exceptions
from delphin import tsdb, itsdb, tsql
from delphin.lnk import Lnk
from delphin.semi import SemI, load as load_semi
from delphin import util
from delphin.exceptions import PyDelphinException
# Default modules need to import the PyDelphin version
from delphin.__about__ import __version__ # noqa: F401
logger = logging.getLogger(__name__)
# EXCEPTIONS ##################################################################
[docs]class CommandError(exceptions.PyDelphinException):
"""Raised on an invalid command call."""
###############################################################################
# CONVERT #####################################################################
[docs]def convert(path, source_fmt, target_fmt, select='result.mrs',
properties=True, lnk=True, color=False, indent=None,
show_status=False, predicate_modifiers=False,
semi=None):
"""
Convert between various DELPH-IN Semantics representations.
If *source_fmt* ends with ``"-lines"``, then *path* must be an
input file containing one representation per line to be read with
the :func:`decode` function of the source codec. If *target_fmt*
ends with ``"-lines"``, then any :attr:`HEADER`, :attr:`JOINER`,
or :attr:`FOOTER` defined by the target codec are ignored. The
*source_fmt* and *target_fmt* arguments are then downcased and
hyphens are removed to normalize the codec name.
Note:
For syntax highlighting, `delphin.highlight`_ must be
installed, and it is only available for select target formats.
.. _delphin.highlight: https://github.com/delph-in/delphin.highlight
Args:
path (str, file): filename, testsuite directory, open file, or
stream of input representations
source_fmt (str): convert from this format
target_fmt (str): convert to this format
select (str): TSQL query for selecting data (ignored if *path*
is not a testsuite directory; default: `"result:mrs"`)
properties (bool): include morphosemantic properties if `True`
(default: `True`)
lnk (bool): include lnk surface alignments and surface strings
if `True` (default: `True`)
color (bool): apply syntax highlighting if `True` and
*target_fmt* is `"simplemrs"` (default: `False`)
indent (int, optional): specifies an explicit number of spaces
for indentation
show_status (bool): show disconnected EDS nodes (ignored if
*target_fmt* is not `"eds"`; default: `False`)
predicate_modifiers (bool): apply EDS predicate modification
for certain kinds of patterns (ignored if *target_fmt* is
not an EDS format; default: `False`)
semi: a :class:`delphin.semi.SemI` object or path to a SEM-I
(ignored if *target_fmt* is not `indexedmrs`)
Returns:
str: the converted representation
"""
if path is None:
path = sys.stdin
# normalize codec names
source_fmt, source_lines = _parse_format_name(source_fmt)
target_fmt, target_lines = _parse_format_name(target_fmt)
# process other arguments
highlight = _get_highlighter(color, target_fmt)
source_codec = _get_codec(source_fmt)
target_codec = _get_codec(target_fmt)
converter = _get_converter(source_codec, target_codec, predicate_modifiers)
if len(tsql.inspect_query('select ' + select)['projection']) != 1:
raise CommandError(
'Exactly 1 column must be given in selection query: '
'(e.g., result.mrs)')
if semi is not None and not isinstance(semi, SemI):
# lets ignore the SEM-I warnings until questions regarding
# valid SEM-Is are resolved
with warnings.catch_warnings():
warnings.simplefilter('ignore')
semi = load_semi(semi)
# read
kwargs = {}
if source_fmt == 'indexedmrs' and semi is not None:
kwargs['semi'] = semi
if source_lines:
xs = _read_lines(path, source_codec, kwargs)
else:
xs = _read(path, source_codec, select, kwargs)
# convert if source representation != target representation
xs = _iter_convert(converter, xs)
# write
kwargs = {}
if indent:
kwargs['indent'] = indent
if target_fmt == 'eds':
kwargs['show_status'] = show_status
if target_fmt == 'indexedmrs' and semi is not None:
kwargs['semi'] = semi
kwargs['properties'] = properties
kwargs['lnk'] = lnk
# Manually dealing with headers, joiners, and footers is to
# accommodate streaming output. Otherwise it is the same as
# calling the following:
# target_codec.dumps(xs, **kwargs)
if target_lines:
header = footer = ''
joiner = '\n'
else:
header = getattr(target_codec, 'HEADER', '')
joiner = getattr(target_codec, 'JOINER', ' ')
footer = getattr(target_codec, 'FOOTER', '')
if indent is not None:
if header:
header += '\n'
joiner = joiner.strip() + '\n'
if footer:
footer = '\n' + footer
parts = []
for x in xs:
try:
s = target_codec.encode(x, **kwargs)
except (PyDelphinException, KeyError, IndexError):
logger.exception('could not convert representation')
else:
parts.append(s)
output = highlight(header + joiner.join(parts) + footer)
return output
def _parse_format_name(name):
name = name.lower()
lines = False
if name.endswith('-lines'):
lines = True
name = name[:-6]
name = name.replace('-', '')
return name, lines
def _get_highlighter(color, target_fmt):
if color and target_fmt in ('simplemrs', 'simple-mrs'):
highlight = util.make_highlighter('simplemrs')
else:
highlight = str
return highlight
def _get_codec(name):
try:
codec = util.import_codec(name)
except KeyError as exc:
raise CommandError(f'invalid codec: {name}') from exc
return codec
def _get_converter(source_codec, target_codec, predicate_modifiers):
src_rep = source_codec.CODEC_INFO['representation'].lower()
tgt_rep = target_codec.CODEC_INFO['representation'].lower()
# The following could be done dynamically by inspecting if the
# target representation has a from_{src_rep} function, but that
# seems like overkill, and it's not clear what to do about
# EDS's predicate_modifiers argument in that case.
if (src_rep, tgt_rep) == ('mrs', 'dmrs'):
from delphin.dmrs import from_mrs
def converter(m):
return from_mrs(m, representative_priority=None)
elif (src_rep, tgt_rep) == ('dmrs', 'mrs'):
from delphin.mrs import from_dmrs as converter
elif (src_rep, tgt_rep) == ('mrs', 'eds'):
from delphin.eds import from_mrs
def converter(m):
return from_mrs(m, predicate_modifiers=predicate_modifiers)
elif src_rep == tgt_rep:
converter = None
else:
raise CommandError(
f'{src_rep.upper()} -> {tgt_rep.upper()}'
' conversion is not supported')
return converter
def _read(path, source_codec, select, kwargs):
if hasattr(path, 'read'):
xs = list(source_codec.load(path, **kwargs))
else:
path = Path(path).expanduser()
if path.is_dir():
db = tsdb.Database(path)
# ts = itsdb.TestSuite(path)
xs = [
next(iter(source_codec.loads(r[0], **kwargs)), None)
for r in tsql.select(select, db)
]
else:
xs = list(source_codec.load(path, **kwargs))
yield from xs
def _read_lines(path, source_codec, kwargs):
if hasattr(path, 'read'):
yield from _read_file(path, source_codec, kwargs)
else:
path = Path(path).expanduser()
with path.open() as fh:
yield from _read_file(fh, source_codec, kwargs)
def _read_file(fh, source_codec, kwargs):
for line in fh:
yield source_codec.decode(line, **kwargs)
def _iter_convert(converter, xs):
if not converter:
logger.info('no conversion necessary')
for i, x in enumerate(xs, 1):
logger.debug('item %d: %r', i, x)
yield x
else:
logger.info('converting...')
for i, x in enumerate(xs, 1):
logger.debug('item %d: %r', i, x)
try:
yield converter(x)
except PyDelphinException:
logger.error('could not convert item %d', i)
###############################################################################
# SELECT ######################################################################
[docs]def select(query: str, path: util.PathLike, record_class=None):
"""
Select data from [incr tsdb()] test suites.
Args:
query (str): TSQL select query (e.g., `'i-id i-input mrs'` or
`'* from item where readings > 0'`)
path: path to a TSDB test suite
record_class: alternative class for records in the selection
Yields:
selected data from the test suite
"""
db = tsdb.Database(path, autocast=True)
return tsql.select(query, db, record_class=record_class)
###############################################################################
# MKPROF ######################################################################
[docs]def mkprof(destination, source=None, schema=None, where=None, delimiter=None,
refresh=False, skeleton=False, full=False, gzip=False, quiet=False):
"""
Create [incr tsdb()] profiles or skeletons.
Data for the testsuite may come from an existing testsuite or from
a list of sentences. There are four main usage patterns:
- `source="testsuite/"` -- read data from `testsuite/`
- `source=None, refresh=True` -- read data from *destination*
- `source=None, refresh=False` -- read sentences from stdin
- `source="sents.txt"` -- read sentences from `sents.txt`
The latter two require the *schema* parameter.
Args:
destination (str): path of the new testsuite
source (str): path to a source testsuite or a file containing
sentences; if not given and *refresh* is `False`, sentences
are read from stdin
schema (str): path to a relations file to use for the created
testsuite; if `None` and *source* is a test suite, the
schema of *source* is used
where (str): TSQL condition to filter records by; ignored if
*source* is not a testsuite
delimiter (str): if given, split lines from *source* or stdin
on the character *delimiter*; if *delimiter* is `"@"`,
split using :func:`delphin.tsdb.split`; a header line
with field names is required; ignored when the data source
is not text lines
refresh (bool): if `True`, rewrite the data at *destination*;
implies *full* is `True`; ignored if *source* is not
`None`, best combined with *schema* or *gzip* (default:
`False`)
skeleton (bool): if `True`, only write tsdb-core files
(default: `False`)
full (bool): if `True`, copy all data from the source
testsuite; ignored if the data source is not a testsuite
or if *skeleton* is `True` (default: `False`)
gzip (bool): if `True`, non-empty tables will be compressed
with gzip
quiet (bool): if `True`, don't print summary information
"""
destination = Path(destination).expanduser()
if source is not None:
source = Path(source).expanduser()
if schema is not None:
schema = tsdb.read_schema(schema)
old_relation_files = []
# work in-place on destination test suite
if source is None and refresh:
db = tsdb.Database(destination)
old_relation_files = list(db.schema)
tsdb.write_database(db, db.path, schema=schema, gzip=gzip)
# input is sentences on stdin or a file of sentences
elif source is None and not refresh:
_mkprof_from_lines(
destination, sys.stdin, schema, delimiter, gzip)
elif source.is_file():
with source.open() as fh:
_mkprof_from_lines(
destination, fh, schema, delimiter, gzip)
# input is source testsuite
elif source.is_dir():
db = tsdb.Database(source)
old_relation_files = list(db.schema)
_mkprof_from_database(
destination, db, schema, where, full, gzip)
else:
raise CommandError(f'invalid source for mkprof: {source!s}')
_mkprof_cleanup(destination, skeleton, old_relation_files)
if not quiet:
_mkprof_summarize(destination, tsdb.read_schema(destination))
def _mkprof_from_lines(destination, stream, schema, delimiter, gzip):
if not schema:
raise CommandError(
'a schema is required to make a testsuite from text')
lineiter = iter(stream)
colnames, split = _make_split(delimiter, lineiter)
# setup destination testsuite
tsdb.initialize_database(destination, schema, files=True)
tsdb.write(destination,
'item',
_lines_to_records(lineiter, colnames, split, schema['item']),
fields=schema['item'],
gzip=gzip)
def _lines_to_records(lineiter, colnames, split, fields):
with_i_id = with_i_length = False
for field in fields:
if field.name == 'i-id':
with_i_id = True
elif field.name == 'i-length':
with_i_length = True
i_ids = set()
for i, line in enumerate(lineiter, 1):
colvals = split(line.rstrip('\n'))
if len(colvals) != len(colnames):
raise CommandError(
'line values do not match expected fields:\n'
f' fields: {", ".join(colnames)}\n'
f' values: {", ".join(colvals)}')
colmap = dict(zip(colnames, colvals))
if with_i_id:
if 'i-id' not in colmap:
colmap['i-id'] = i
if colmap['i-id'] in i_ids:
raise CommandError(f'duplicate i-id: {colmap["i-id"]}')
i_ids.add(colmap['i-id'])
if with_i_length and 'i-length' not in colmap and 'i-input' in colmap:
colmap['i-length'] = len(colmap['i-input'].split())
yield tsdb.make_record(colmap, fields)
def _make_split(delimiter, lineiter):
if not delimiter:
def split(line):
return (0, line[1:]) if line.startswith('*') else (1, line)
colnames = ('i-wf', 'i-input')
else:
if delimiter == '@':
split = tsdb.split
else:
def split(line):
return line.split(delimiter)
colnames = split(next(lineiter))
return colnames, split
def _mkprof_from_database(destination, db, schema, where, full, gzip):
if schema is None:
schema = db.schema
destination.mkdir(exist_ok=True)
tsdb.write_schema(destination, schema)
to_copy = set(schema if full else tsdb.TSDB_CORE_FILES)
where = '' if where is None else 'where ' + where
for table in schema:
if table not in to_copy or _no_such_relation(db, table):
records = []
elif where:
# filter the data, but use all if the query fails
# (e.g., if the filter and table cannot be joined)
try:
records = _tsql_distinct(
tsql.select(f'* from {table} {where}', db))
except tsql.TSQLError:
records = list(db[table])
else:
records = list(db[table])
tsdb.write(destination,
table,
records,
schema[table],
gzip=gzip)
def _no_such_relation(db, name):
"""
Return True if the relation *name* is not defined in *db* or does
not exist, otherwise False.
"""
if name not in db:
return True
try:
tsdb.get_path(db.path, name)
except tsdb.TSDBError:
return True
return False
def _tsql_distinct(records):
distinct = []
prev = None
for record in records:
if record != prev:
distinct.append(record)
prev = record
return distinct
def _mkprof_cleanup(destination, skeleton, old_files):
schema = tsdb.read_schema(destination)
to_keep = set(schema)
if skeleton:
to_keep = to_keep.intersection(tsdb.TSDB_CORE_FILES)
for name in set(schema).union(old_files):
tx_path = destination.joinpath(name).with_suffix('')
gz_path = destination.joinpath(name).with_suffix('.gz')
if (tx_path.is_file()
and (name not in to_keep
or (skeleton and tx_path.stat().st_size == 0))):
tx_path.unlink()
if (gz_path.is_file()
and (name not in to_keep
or (skeleton and gz_path.stat().st_size == 0))):
gz_path.unlink()
def _mkprof_summarize(destination, schema):
# summarize what was done
isatty = sys.stdout.isatty()
def _red(s):
return f'\x1b[1;31m{s}\x1b[0m' if isatty else s
fmt = '{:>8} bytes\t{}'
for filename in ['relations'] + list(schema):
path = destination.joinpath(filename)
if path.is_file():
stat = path.stat()
print(fmt.format(stat.st_size, filename))
elif path.with_suffix('.gz').is_file():
stat = path.with_suffix('.gz').stat()
print(fmt.format(stat.st_size, _red(filename + '.gz')))
###############################################################################
# PROCESS #####################################################################
[docs]def process(grammar, testsuite, source=None, select=None,
generate=False, transfer=False, full_forest=False,
options=None, all_items=False, result_id=None, gzip=False,
stderr=None):
"""
Process (e.g., parse) a [incr tsdb()] profile.
Results are written to directly to *testsuite*.
If *select* is `None`, the defaults depend on the task:
========== =========================
Task Default value of *select*
========== =========================
Parsing `item.i-input`
Transfer `result.mrs`
Generation `result.mrs`
========== =========================
Args:
grammar (str): path to a compiled grammar image
testsuite (str): path to a [incr tsdb()] testsuite where data
will be read from (see *source*) and written to
source (str): path to a [incr tsdb()] testsuite; if `None`,
*testsuite* is used as the source of data
select (str): TSQL query for selecting processor inputs
(default depends on the processor type)
generate (bool): if `True`, generate instead of parse
(default: `False`)
transfer (bool): if `True`, transfer instead of parse
(default: `False`)
options (list): list of ACE command-line options to use when
invoking the ACE subprocess; unsupported options will
give an error message
all_items (bool): if `True`, don't exclude ignored items
(those with `i-wf==2`) when parsing
result_id (int): if given, only keep items with the specified
`result-id`
gzip (bool): if `True`, non-empty tables will be compressed
with gzip
stderr (file): stream for ACE's stderr
"""
from delphin import ace
grammar = Path(grammar).expanduser()
testsuite = Path(testsuite).expanduser()
if not grammar.is_file():
raise CommandError(f'{grammar} is not a file')
kwargs = {}
kwargs['stderr'] = stderr
if sum(1 if mode else 0 for mode in (generate, transfer, full_forest)) > 1:
raise CommandError("'generate', 'transfer', and 'full-forest' "
"are mutually exclusive")
if source is None:
source = _validate_tsdb(testsuite)
else:
source = _validate_tsdb(source)
if not tsdb.is_database_directory(testsuite):
if testsuite.exists():
raise CommandError(
f'{testsuite} exists and is not a TSDB database; '
'remove it or select a different destination path')
mkprof(testsuite, source=source, full=False, quiet=True)
else:
pass # both source and testsuite are valid TSDB databases
if select is None:
select = 'result.mrs' if (generate or transfer) else 'item.i-input'
if generate:
processor = ace.ACEGenerator
elif transfer:
processor = ace.ACETransferer
else:
if full_forest:
kwargs['full_forest'] = True
if not all_items:
select += ' where i-wf != 2'
processor = ace.ACEParser
if result_id is not None:
select += f' where result-id == {result_id}'
target = itsdb.TestSuite(testsuite)
column, relation, condition = _interpret_selection(select, source)
with tempfile.TemporaryDirectory() as dir:
# use a temporary test suite directory for filtered inputs
mkprof(dir, source=source, where=condition,
full=True, gzip=True, quiet=True)
tmp = itsdb.TestSuite(dir)
with processor(grammar, cmdargs=options, **kwargs) as cpu:
target.process(cpu,
selector=(relation, column),
source=tmp,
gzip=gzip)
def _interpret_selection(select, source):
schema = tsdb.read_schema(source)
queryobj = tsql.inspect_query('select ' + select)
projection = queryobj['projection']
if projection == '*' or len(projection) != 1:
raise CommandError("select query must return a single column")
relation, _, column = projection[0].rpartition('.')
if not relation:
# query could be 'i-input from item' instead of 'item.i-input'
if len(queryobj['relations']) == 1:
relation = queryobj['relations'][0]
elif len(queryobj['relations']) > 1:
raise CommandError(
"select query may specify no more than 1 relation")
# otherwise guess
else:
relation = next(
(table for table in schema
if any(f.name == column for f in schema[table])),
None)
if relation not in schema:
raise CommandError('invalid or missing relation in query')
elif not any(f.name == column for f in schema[relation]):
raise CommandError(f'invalid column in query: {column}')
try:
condition = select[select.index(' where ') + 7:]
except ValueError:
condition = ''
return column, relation, condition
###############################################################################
# REPP ########################################################################
[docs]def repp(source, config=None, module=None, active=None,
format=None, color=False, trace_level=0):
"""
Tokenize with a Regular Expression PreProcessor (REPP).
Results are printed directly to stdout. If more programmatic
access is desired, the :mod:`delphin.repp` module provides a
similar interface.
Args:
source (str, file): filename, open file, or stream of sentence
inputs
config (str): path to a PET REPP configuration (.set) file
module (str): path to a top-level REPP module; other modules
are found by external group calls
active (list): select which modules are active; if `None`, all
are used; incompatible with *config* (default: `None`)
format (str): the output format (`"yy"`, `"string"`, `"line"`,
or `"triple"`; default: `"yy"`)
color (bool): apply syntax highlighting if `True` (default:
`False`)
trace_level (int): if `0` no trace info is printed; if `1`,
applied rules are printed, if greater than `1`, both
applied and unapplied rules (in order) are printed
(default: `0`)
"""
from delphin.repp import REPP, REPPResult
if color:
highlight = util.make_highlighter('diff')
else:
highlight = str
if config is not None and module is not None:
raise CommandError("cannot specify both 'config' and 'module'")
if config is not None and active:
raise CommandError("'active' cannot be used with 'config'")
if config:
r = REPP.from_config(config)
elif module:
r = REPP.from_file(module, active=active)
else:
r = REPP() # just tokenize
def _repp(line):
line = line.rstrip('\n')
if trace_level > 0:
for step in r.trace(line, verbose=True):
if isinstance(step, REPPResult):
print(f'Done:{step.string}')
elif hasattr(step.operation, 'pattern'):
if step.applied:
print('Applied:', step.operation)
print(highlight(f'-{step.input}\n+{step.output}'))
elif trace_level > 1:
print('Did not apply:', step.operation)
else:
step = r.apply(line)
res = r.tokenize_result(step)
if format == 'yy':
print(res)
elif format == 'string':
print(' '.join(t.form for t in res.tokens))
elif format == 'line':
for t in res.tokens:
print(t.form)
print()
elif format == 'triple':
for t in res.tokens:
if t.lnk.type == Lnk.CHARSPAN:
cfrom, cto = t.lnk.data
else:
cfrom, cto = -1, -1
print(f'({cfrom}, {cto}, {t.form})')
print()
if hasattr(source, 'read'):
for line in source:
_repp(line)
else:
source = Path(source).expanduser()
with source.open(encoding='utf-8') as fh:
for line in fh:
_repp(line)
###############################################################################
# COMPARE #####################################################################
[docs]def compare(testsuite, gold, select='i-id i-input mrs'):
"""
Compare two [incr tsdb()] profiles.
Args:
testsuite (str, TestSuite): path to the test [incr tsdb()]
testsuite or a :class:`TestSuite` object
gold (str, TestSuite): path to the gold [incr tsdb()]
testsuite or a :class:`TestSuite` object
select: TSQL query to select (id, input, mrs) triples
(default: `i-id i-input mrs`)
Yields:
dict: Comparison results as::
{"id": "item identifier",
"input": "input sentence",
"test": number_of_unique_results_in_test,
"shared": number_of_shared_results,
"gold": number_of_unique_results_in_gold}
"""
from delphin import mrs
from delphin.codecs import simplemrs
if not isinstance(testsuite, itsdb.TestSuite):
testsuite = itsdb.TestSuite(_validate_tsdb(testsuite))
if not isinstance(gold, itsdb.TestSuite):
gold = itsdb.TestSuite(_validate_tsdb(gold))
queryobj = tsql.inspect_query('select ' + select)
if len(queryobj['projection']) != 3:
raise CommandError('select does not return 3 fields: ' + select)
input_select = '{} {}'.format(queryobj['projection'][0],
queryobj['projection'][1])
i_inputs = dict(tsql.select(input_select, testsuite))
matched_rows = itsdb.match_rows(
tsql.select(select, testsuite),
tsql.select(select, gold),
0)
for (key, testrows, goldrows) in matched_rows:
(test_unique, shared, gold_unique) = mrs.compare_bags(
[simplemrs.decode(row[2]) for row in testrows],
[simplemrs.decode(row[2]) for row in goldrows])
yield {'id': key,
'input': i_inputs.get(key),
'test': test_unique,
'shared': shared,
'gold': gold_unique}
###############################################################################
# HELPERS #####################################################################
def _validate_tsdb(path):
path = Path(path).expanduser()
if not tsdb.is_database_directory(path):
raise CommandError(f'{path} is not a valid TSDB database')
return path