Source code for delphin.interfaces.base


from collections import Sequence
from datetime import datetime

from delphin.derivation import Derivation
from delphin.tokens import YyTokenLattice
from delphin.mrs import (
    Mrs,
    Dmrs,
    simplemrs,
    eds,
)
from delphin.util import SExpr, stringtypes


[docs]class Processor(object): """ Base class for processors. This class defines the basic interface for all PyDelphin processors, such as :class:`~delphin.interfaces.ace.AceProcess` and :class:`~delphin.interfaces.rest.DelphinRestClient`. It can also be used to define preprocessor wrappers of other processors such that it has the same interface, allowing it to be used, e.g., with :meth:`TestSuite.process() <delphin.itsdb.TestSuite.process>`. Attributes: task: name of the task the processor performs (e.g. `"parse"`, `"transfer"`, or `"generate"`) """ task = None
[docs] def process_item(self, datum, keys=None): """ Send *datum* to the processor and return the result. This method is a generic wrapper around a processor-specific processing method that keeps track of additional item and processor information. Specifically, if *keys* is provided, it is copied into the `keys` key of the response object, and if the processor object's `task` member is non-`None`, it is copied into the `task` key of the response. These help with keeping track of items when many are processed at once, and to help downstream functions identify what the process did. Args: datum: the item content to process keys: a mapping of item identifiers which will be copied into the response """ raise NotImplementedError()
[docs]class ParseResult(dict): """ A wrapper around a result dictionary to automate deserialization for supported formats. A ParseResult is still a dictionary, so the raw data can be obtained using dict access. """ def __repr__(self): return 'ParseResult({})'.format(dict.__repr__(self))
[docs] def derivation(self): """ Deserialize and return a Derivation object for UDF- or JSON-formatted derivation data; otherwise return the original string. """ drv = self.get('derivation') if drv is not None: if isinstance(drv, dict): drv = Derivation.from_dict(drv) elif isinstance(drv, stringtypes): drv = Derivation.from_string(drv) return drv
[docs] def tree(self): """ Deserialize and return a labeled syntax tree. The tree data may be a standalone datum, or embedded in the derivation. """ tree = self.get('tree') if isinstance(tree, stringtypes): tree = SExpr.parse(tree).data elif tree is None: drv = self.get('derivation') if isinstance(drv, dict) and 'label' in drv: def _extract_tree(d): t = [d.get('label', '')] if 'tokens' in d: t.append([d.get('form', '')]) else: for dtr in d.get('daughters', []): t.append(_extract_tree(dtr)) return t tree = _extract_tree(drv) return tree
[docs] def mrs(self): """ Deserialize and return an Mrs object for simplemrs or JSON-formatted MRS data; otherwise return the original string. """ mrs = self.get('mrs') if mrs is not None: if isinstance(mrs, dict): mrs = Mrs.from_dict(mrs) elif isinstance(mrs, stringtypes): mrs = simplemrs.loads_one(mrs) return mrs
[docs] def eds(self): """ Deserialize and return an Eds object for native- or JSON-formatted EDS data; otherwise return the original string. """ _eds = self.get('eds') if _eds is not None: if isinstance(_eds, dict): _eds = eds.Eds.from_dict(_eds) elif isinstance(_eds, stringtypes): _eds = eds.loads_one(_eds) return _eds
[docs] def dmrs(self): """ Deserialize and return a Dmrs object for JSON-formatted DMRS data; otherwise return the original string. """ dmrs = self.get('dmrs') if dmrs is not None: if isinstance(dmrs, dict): dmrs = Dmrs.from_dict(dmrs) return dmrs
[docs]class ParseResponse(dict): """ A wrapper around the response dictionary for more convenient access to results. """ _result_factory = ParseResult def __repr__(self): return 'ParseResponse({})'.format(dict.__repr__(self))
[docs] def results(self): """Return ParseResult objects for each result.""" return [self._result_factory(r) for r in self.get('results', [])]
[docs] def result(self, i): """Return a ParseResult object for the *i*\ th result.""" return self._result_factory(self.get('results', [])[i])
[docs] def tokens(self, tokenset='internal'): """ Deserialize and return a YyTokenLattice object for the initial or internal token set, if provided, from the YY format or the JSON-formatted data; otherwise return the original string. Args: tokenset (str): return `'initial'` or `'internal'` tokens (default: `'internal'`) Returns: :class:`YyTokenLattice` """ toks = self.get('tokens', {}).get(tokenset) if toks is not None: if isinstance(toks, stringtypes): toks = YyTokenLattice.from_string(toks) elif isinstance(toks, Sequence): toks = YyTokenLattice.from_list(toks) return toks
[docs]class FieldMapper(object): """ A class for mapping responses to [incr tsdb()] fields. This class provides two methods for mapping responses to fields: * map() - takes a response and returns a list of (table, data) tuples for the data in the response, as well as aggregating any necessary information * cleanup() - returns any (table, data) tuples resulting from aggregated data over all runs, then clears this data In addition, the :attr:`affected_tables` attribute should list the names of tables that become invalidated by using this FieldMapper to process a profile. Generally this is the list of tables that :meth:`map` and :meth:`cleanup` create records for, but it may also include those that rely on the previous set (e.g., treebanking preferences, etc.). Alternative [incr tsdb()] schema can be handled by overriding these two methods and the __init__() method. Attributes: affected_tables: list of tables that are affected by the processing """ def __init__(self): # the parse keys exclude some that are handled specially self._parse_keys = ''' ninputs ntokens readings first total tcpu tgc treal words l-stasks p-ctasks p-ftasks p-etasks p-stasks aedges pedges raedges rpedges tedges eedges ledges sedges redges unifications copies conses symbols others gcs i-load a-load date error comment '''.split() self._result_keys = ''' result-id time r-ctasks r-ftasks r-etasks r-stasks size r-aedges r-pedges derivation surface tree mrs '''.split() self._run_keys = ''' run-comment platform protocol tsdb application environment grammar avms sorts templates lexicon lrules rules user host os start end items status '''.split() self._parse_id = -1 self._runs = {} self._last_run_id = -1 self.affected_tables = ''' run parse result rule output edge tree decision preference update fold score '''.split()
[docs] def map(self, response): """ Process *response* and return a list of (table, rowdata) tuples. """ inserts = [] parse = {} # custom remapping, cleanup, and filling in holes parse['i-id'] = response.get('keys', {}).get('i-id', -1) self._parse_id = max(self._parse_id + 1, parse['i-id']) parse['parse-id'] = self._parse_id parse['run-id'] = response.get('run', {}).get('run-id', -1) if 'tokens' in response: parse['p-input'] = response['tokens'].get('initial') parse['p-tokens'] = response['tokens'].get('internal') if 'ninputs' not in response: toks = response.tokens('initial') if toks is not None: response['ninputs'] = len(toks.tokens) if 'ntokens' not in response: toks = response.tokens('internal') if toks is not None: response['ntokens'] = len(toks.tokens) if 'readings' not in response and 'results' in response: response['readings'] = len(response['results']) # basic mapping for key in self._parse_keys: if key in response: parse[key] = response[key] inserts.append(('parse', parse)) for result in response.get('results', []): d = {'parse-id': self._parse_id} if 'flags' in result: d['flags'] = SExpr.format(result['flags']) for key in self._result_keys: if key in result: d[key] = result[key] inserts.append(('result', d)) if 'run' in response: run_id = response['run'].get('run-id', -1) # check if last run was not closed properly if run_id not in self._runs and self._last_run_id in self._runs: last_run = self._runs[self._last_run_id] if 'end' not in last_run: last_run['end'] = datetime.now() self._runs[run_id] = response['run'] self._last_run_id = run_id return inserts
[docs] def cleanup(self): """ Return aggregated (table, rowdata) tuples and clear the state. """ inserts = [] last_run = self._runs[self._last_run_id] if 'end' not in last_run: last_run['end'] = datetime.now() for run_id in sorted(self._runs): run = self._runs[run_id] d = {'run-id': run.get('run-id', -1)} for key in self._run_keys: if key in run: d[key] = run[key] inserts.append(('run', d)) # reset for next task self._parse_id = -1 self._runs = {} self._last_run_id = -1 return inserts