Source code for delphin.mrs.xmrs


"""
Classes and functions for general \*MRS processing.
"""

from collections import (defaultdict, deque)
from itertools import chain

from delphin.exceptions import (XmrsError, XmrsStructureError)
from delphin.util import safe_int, _bfs, _connected_components
from .components import (
    ElementaryPredication, HandleConstraint, IndividualConstraint,
    Lnk, _LnkMixin, var_re, var_sort, _VarGenerator,
    Pred, Node, nodes, Link, links
)
from .config import (
    HANDLESORT, UNKNOWNSORT, LTOP_NODEID, FIRST_NODEID,
    IVARG_ROLE, CONSTARG_ROLE, RSTR_ROLE, BARE_EQ_ROLE,
    EQ_POST, HEQ_POST, H_POST, NIL_POST, CVARSORT
)


[docs]class Xmrs(_LnkMixin): """ Xmrs is a common class for Mrs, Rmrs, and Dmrs objects. Args: top: the TOP (or maybe LTOP) variable index: the INDEX variable xarg: the XARG variable eps: an iterable of EPs (see above) hcons: an iterable of HCONS (see above) icons: an iterable of ICONS (see above) vars: a mapping of variable to a list of property-value pairs lnk: the Lnk object associating the Xmrs to the surface form surface: the surface string identifier: a discourse-utterance id Xmrs can be instantiated directly, but it may be more convenient to use the :func:`Mrs`, :func:`Rmrs`, or :func:`Dmrs` constructor functions. Variables are simply strings, but must be of the proper form in order to be recognized as variables and not constants. The form is basically a sequence of non-integers followed by a sequence of integers, but see :data:`delphin.mrs.components.var_re` for the regular expression used to determine a match. The *eps* argument is an iterable of tuples representing ElementaryPredications. These can be objects of the ElementaryPredication class itself, or an equivalent tuple. The same goes for *hcons* and *icons* with the HandleConstraint and IndividualConstraint classes, respectively. Attributes: top: the top (i.e. LTOP) handle index: the semantic index xarg: the external argument lnk (:class:`~delphin.mrs.components.Lnk`): surface alignment surface: the surface string identifier: a discourse-utterance ID (often unset) """ def __init__(self, top=None, index=None, xarg=None, eps=None, hcons=None, icons=None, vars=None, lnk=None, surface=None, identifier=None): self.top = top self.index = index self.xarg = xarg self._nodeids = [] self._eps = {} self._hcons = {} self._icons = {} self._vars = defaultdict( lambda: {'props': [], 'refs': defaultdict(list)} ) # just calling __getitem__ will instantiate them on _vars if top is not None: self._vars[top] if index is not None: self._vars[index] if xarg is not None: self._vars[xarg] if vars is not None: _vars = self._vars for var, props in vars.items(): if hasattr(props, 'items'): props = list(props.items()) _vars[var]['props'] = props if eps is not None: self.add_eps(eps) if hcons is not None: self.add_hcons(hcons) if icons is not None: self.add_icons(icons) #: A Lnk object to associate the Xmrs to the surface form self.lnk = lnk # Lnk object (MRS-level lnk spans the whole input) #: The surface string self.surface = surface # The surface string #: A discourse-utterance id self.identifier = identifier # Associates an utterance with the RMRS
[docs] @classmethod def from_xmrs(cls, xmrs, **kwargs): """ Facilitate conversion among subclasses. Args: xmrs (:class:`Xmrs`): instance to convert from; possibly an instance of a subclass, such as :class:`Mrs` or :class:`Dmrs` **kwargs: additional keyword arguments that may be used by a subclass's redefinition of :meth:`from_xmrs`. """ x = cls() x.__dict__.update(xmrs.__dict__) return x
[docs] def add_eps(self, eps): """ Incorporate the list of EPs given by *eps*. """ # (nodeid, pred, label, args, lnk, surface, base) _nodeids, _eps, _vars = self._nodeids, self._eps, self._vars for ep in eps: try: if not isinstance(ep, ElementaryPredication): ep = ElementaryPredication(*ep) except TypeError: raise XmrsError('Invalid EP data: {}'.format(repr(ep))) # eplen = len(ep) # if eplen < 3: # raise XmrsError( # 'EPs must have length >= 3: (nodeid, pred, label, ...)' # ) nodeid, lbl = ep.nodeid, ep.label if nodeid in _eps: raise XmrsError( 'EP already exists in Xmrs: {} ({})' .format(nodeid, ep[1]) ) _nodeids.append(nodeid) _eps[nodeid] = ep if lbl is not None: _vars[lbl]['refs']['LBL'].append(nodeid) for role, val in ep.args.items(): # if the val is not in _vars, it might still be a # variable; check with var_re if val in _vars or var_re.match(val): vardict = _vars[val] vardict['refs'][role].append(nodeid)
# if role == IVARG_ROLE: # if pred.is_quantifier(): # vardict['bv'] = nodeid # else: # vardict['iv'] = nodeid
[docs] def add_hcons(self, hcons): """ Incorporate the list of HandleConstraints given by *hcons*. """ # (hi, relation, lo) _vars = self._vars _hcons = self._hcons for hc in hcons: try: if not isinstance(hc, HandleConstraint): hc = HandleConstraint(*hc) except TypeError: raise XmrsError('Invalid HCONS data: {}'.format(repr(hc))) hi = hc.hi lo = hc.lo if hi in _hcons: raise XmrsError( 'Handle constraint already exists for hole %s.' % hi ) _hcons[hi] = hc # the following should also ensure lo and hi are in _vars if 'hcrefs' not in _vars[lo]: _vars[lo]['hcrefs'] = [] for role, refs in _vars[hi]['refs'].items(): for nodeid in refs: _vars[lo]['hcrefs'].append((nodeid, role, hi))
[docs] def add_icons(self, icons): """ Incorporate the individual constraints given by *icons*. """ _vars, _icons = self._vars, self._icons for ic in icons: try: if not isinstance(ic, IndividualConstraint): ic = IndividualConstraint(*ic) except TypeError: raise XmrsError('Invalid ICONS data: {}'.format(repr(ic))) left = ic.left right = ic.right if left not in _icons: _icons[left] = [] _icons[left].append(ic) # the following should also ensure left and right are in _vars if 'icrefs' not in _vars[right]: _vars[right]['icrefs'] = [] _vars[right]['icrefs'].append(ic) _vars[left] # just to instantiate if not done yet
def __repr__(self): if self.surface is not None: stringform = '"{}"'.format(self.surface) else: stringform = ' '.join(ep[1].lemma for ep in self.eps()) return '<{} object ({}) at {}>'.format( self.__class__.__name__, stringform, id(self) ) def __contains__(self, obj): return obj in self._eps or obj in self._vars def __eq__(self, other): # actual equality is more than isomorphism, all variables and # things must have the same form, not just the same shape if not isinstance(other, Xmrs): return NotImplemented if ((self.top, self.index, self.xarg) != (other.top, other.index, other.xarg)): return False a, b = sorted(self.eps()), sorted(other.eps()) if len(a) != len(b) or any(ep1 != ep2 for ep1, ep2 in zip(a, b)): return False a, b = sorted(self.hcons()), sorted(other.hcons()) if len(a) != len(b) or any(hc1 != hc2 for hc1, hc2 in zip(a, b)): return False a, b = sorted(self.icons()), sorted(other.icons()) if len(a) != len(b) or any(ic1 != ic2 for ic1, ic2 in zip(a, b)): return False for v in self.variables(): if self.properties(v) != other.properties(v): return False return True @property def ltop(self): """ The top handle if specified; `None` otherwise. Note: Equivalent to :attr:`top` """ return self.top # basic access to internal structures
[docs] def nodeid(self, iv, quantifier=False): """ Return the nodeid of the predication selected by *iv*. Args: iv: the intrinsic variable of the predication to select quantifier: if `True`, treat *iv* as a bound variable and find its quantifier; otherwise the non-quantifier will be returned """ return next(iter(self.nodeids(ivs=[iv], quantifier=quantifier)), None)
[docs] def nodeids(self, ivs=None, quantifier=None): """ Return the list of nodeids given by *ivs*, or all nodeids. Args: ivs: the intrinsic variables of the predications to select; if `None`, return all nodeids (but see *quantifier*) quantifier: if `True`, only return nodeids of quantifiers; if `False`, only return non-quantifiers; if `None` (the default), return both """ if ivs is None: nids = list(self._nodeids) else: _vars = self._vars nids = [] for iv in ivs: if iv in _vars and IVARG_ROLE in _vars[iv]['refs']: nids.extend(_vars[iv]['refs'][IVARG_ROLE]) else: raise KeyError(iv) if quantifier is not None: nids = [n for n in nids if self.ep(n).is_quantifier()==quantifier] return nids
[docs] def ep(self, nodeid): """ Return the ElementaryPredication with the given *nodeid*. """ return self._eps[nodeid]
[docs] def eps(self, nodeids=None): """ Return the EPs with the given *nodeid*, or all EPs. Args: nodeids: an iterable of nodeids of EPs to return; if `None`, return all EPs """ if nodeids is None: nodeids = self._nodeids _eps = self._eps return [_eps[nodeid] for nodeid in nodeids]
[docs] def hcon(self, hi): """ Return the HandleConstraint with high variable *hi*. """ return self._hcons[hi]
[docs] def hcons(self): """ Return the list of HCONS. """ return list(self._hcons.values())
[docs] def icons(self, left=None): """ Return the ICONS with left variable *left*, or all ICONS. Args: left: the left variable of the ICONS to return; if `None`, return all ICONS """ if left is not None: return self._icons[left] else: return list(chain.from_iterable(self._icons.values()))
[docs] def variables(self): """ Return the list of all variables. """ return list(self._vars)
# access to internal sub-structures
[docs] def properties(self, var_or_nodeid, as_list=False): """ Return a dictionary of variable properties for *var_or_nodeid*. Args: var_or_nodeid: if a variable, return the properties associated with the variable; if a nodeid, return the properties associated with the intrinsic variable of the predication given by the nodeid """ props = [] if var_or_nodeid in self._vars: props = self._vars[var_or_nodeid]['props'] elif var_or_nodeid in self._eps: var = self._eps[var_or_nodeid][3].get(IVARG_ROLE) props = self._vars.get(var, {}).get('props', []) else: raise KeyError(var_or_nodeid) if not as_list: props = dict(props) return props
[docs] def pred(self, nodeid): """ Return the Pred object for the predications given by *nodeid*. """ return self._eps[nodeid][1]
[docs] def preds(self, nodeids=None): """ Return the Pred objects for *nodeids*, or all Preds. Args: nodeids: an iterable of nodeids of predications to return Preds from; if `None`, return all Preds """ if nodeids is None: nodeids = self._nodeids _eps = self._eps return [_eps[nid][1] for nid in nodeids]
[docs] def label(self, nodeid): """ Return the label of the predication given by *nodeid* """ return self._eps[nodeid][2]
[docs] def labels(self, nodeids=None): """ Return the list of labels for *nodeids*, or all labels. Args: nodeids: an iterable of nodeids for predications to get labels from; if `None`, return labels for all predications Note: This returns the label of each predication, even if it's shared by another predication. Thus, `zip(nodeids, xmrs.labels(nodeids))` will pair nodeids with their labels. Returns: A list of labels """ if nodeids is None: nodeids = self._nodeids _eps = self._eps return [_eps[nid][2] for nid in nodeids]
[docs] def args(self, nodeid): """ Return the arguments for the predication given by *nodeid*. All arguments (including intrinsic and constant arguments) are included. MOD/EQ links are not considered arguments. If only arguments that target other predications are desired, see :meth:`outgoing_args`. Args: nodeid: the nodeid of the EP that is the arguments' source Returns: dict: `{role: tgt}` """ return dict(self._eps[nodeid][3])
# calculated sub-structures
[docs] def outgoing_args(self, nodeid): """ Return the arguments going from *nodeid* to other predications. Valid arguments include regular variable arguments and scopal (label-selecting or HCONS) arguments. MOD/EQ links, intrinsic arguments, and constant arguments are not included. Args: nodeid: the nodeid of the EP that is the arguments' source Returns: dict: `{role: tgt}` """ _vars = self._vars _hcons = self._hcons args = self.args(nodeid) # args is a copy; we can edit it for arg, val in list(args.items()): # don't include constant args or intrinsic args if arg == IVARG_ROLE or val not in _vars: del args[arg] else: refs = _vars[val]['refs'] # don't include if not HCONS or pointing to other IV or LBL if not (val in _hcons or IVARG_ROLE in refs or 'LBL' in refs): del args[arg] return args
[docs] def incoming_args(self, nodeid): """ Return the arguments that target *nodeid*. Valid arguments include regular variable arguments and scopal (label-selecting or HCONS) arguments. MOD/EQ links and intrinsic arguments are not included. Args: nodeid: the nodeid of the EP that is the arguments' target Returns: dict: `{source_nodeid: {rargname: value}}` """ _vars = self._vars ep = self._eps[nodeid] lbl = ep[2] iv = ep[3].get(IVARG_ROLE) in_args_list = [] # variable args if iv in _vars: for role, nids in _vars[iv]['refs'].items(): # ignore intrinsic args, even if shared if role != IVARG_ROLE: in_args_list.append((nids, role, iv)) if lbl in _vars: for role, nids in _vars[lbl]['refs'].items(): # basic label equality isn't "incoming"; ignore if role != 'LBL': in_args_list.append((nids, role, lbl)) for nid, role, hi in _vars[lbl].get('hcrefs', []): in_args_list.append(([nid], role, hi)) in_args = {} for nids, role, tgt in in_args_list: for nid in nids: if nid not in in_args: in_args[nid] = {} in_args[nid][role] = tgt return in_args
[docs] def labelset(self, label): """ Return the set of nodeids for predications that share *label*. Args: label: the label that returned nodeids share. Returns: A set of nodeids, which may be an empty set. """ return self._vars[label]['refs']['LBL']
[docs] def labelset_heads(self, label): """ Return the heads of the labelset selected by *label*. Args: label: the label from which to find head nodes/EPs. Returns: An iterable of nodeids. """ _eps = self._eps _vars = self._vars _hcons = self._hcons nodeids = {nodeid: _eps[nodeid][3].get(IVARG_ROLE, None) for nodeid in _vars[label]['refs']['LBL']} if len(nodeids) <= 1: return list(nodeids) scope_sets = {} for nid in nodeids: scope_sets[nid] = _ivs_in_scope(nid, _eps, _vars, _hcons) out = {} for n in nodeids: out[n] = 0 for role, val in _eps[n][3].items(): if role == IVARG_ROLE or role == CONSTARG_ROLE: continue elif any(val in s for n2, s in scope_sets.items() if n2 != n): out[n] += 1 candidates = [n for n, out_deg in out.items() if out_deg == 0] rank = {} for n in candidates: iv = nodeids[n] pred = _eps[n][1] if iv in _vars and self.nodeid(iv, quantifier=True) is not None: rank[n] = 0 elif pred.is_quantifier(): rank[n] = 0 elif pred.type == Pred.ABSTRACT: rank[n] = 2 else: rank[n] = 1 return sorted(candidates, key=lambda n: rank[n])
[docs] def subgraph(self, nodeids): """ Return an Xmrs object with only the specified *nodeids*. Necessary variables and arguments are also included in order to connect any nodes that are connected in the original Xmrs. Args: nodeids: the nodeids of the nodes/EPs to include in the subgraph. Returns: An :class:`Xmrs` object. """ _eps, _vars = self._eps, self._vars _hcons, _icons = self._hcons, self._icons top = index = xarg = None eps = [_eps[nid] for nid in nodeids] lbls = set(ep[2] for ep in eps) hcons = [] icons = [] subvars = {} if self.top: top = self.top tophc = _hcons.get(top, None) if tophc is not None and tophc[2] in lbls: subvars[top] = {} elif top not in lbls: top = None # nevermind, set it back to None # do index after we know if it is an EPs intrinsic variable. # what about xarg? I'm not really sure.. just put it in if self.xarg: xarg = self.xarg subvars[self.xarg] = _vars[self.xarg]['props'] subvars.update((lbl, {}) for lbl in lbls) subvars.update( (var, _vars[var]['props']) for ep in eps for var in ep[3].values() if var in _vars ) if self.index in subvars: index = self.index # hcons and icons; only if the targets exist in the new subgraph for var in subvars: hc = _hcons.get(var, None) if hc is not None and hc[2] in lbls: hcons.append(hc) for ic in _icons.get(var, []): if ic[0] in subvars and ic[2] in subvars: icons.append(ic) return Xmrs( top=top, index=index, xarg=xarg, eps=eps, hcons=hcons, icons=icons, vars=subvars, lnk=self.lnk, surface=self.surface, identifier=self.identifier )
[docs] def is_connected(self): """ Return `True` if the Xmrs represents a connected graph. Subgraphs can be connected through things like arguments, QEQs, and label equalities. """ nids = set(self._nodeids) # the nids left to find if len(nids) == 0: raise XmrsError('Cannot compute connectedness of an empty Xmrs.') # build a basic dict graph of relations edges = [] # label connections for lbl in self.labels(): lblset = self.labelset(lbl) edges.extend((x, y) for x in lblset for y in lblset if x != y) # argument connections _vars = self._vars for nid in nids: for rarg, tgt in self.args(nid).items(): if tgt not in _vars: continue if IVARG_ROLE in _vars[tgt]['refs']: tgtnids = list(_vars[tgt]['refs'][IVARG_ROLE]) elif tgt in self._hcons: tgtnids = list(self.labelset(self.hcon(tgt)[2])) elif 'LBL' in _vars[tgt]['refs']: tgtnids = list(_vars[tgt]['refs']['LBL']) else: tgtnids = [] # connections are bidirectional edges.extend((nid, t) for t in tgtnids if nid != t) edges.extend((t, nid) for t in tgtnids if nid != t) g = {nid: set() for nid in nids} for x, y in edges: g[x].add(y) connected_nids = _bfs(g) if connected_nids == nids: return True elif connected_nids.difference(nids): raise XmrsError( 'Possibly bogus nodeids: {}' .format(', '.join(connected_nids.difference(nids))) ) return False
[docs] def is_well_formed(self): """ Return `True` if the Xmrs is well-formed, `False` otherwise. See :meth:`validate` """ try: self.validate() except XmrsError: return False return True
[docs] def validate(self): """ Check that the Xmrs is well-formed. The Xmrs is analyzed and a list of problems is compiled. If any problems exist, an :exc:`XmrsError` is raised with the list joined as the error message. A well-formed Xmrs has the following properties: * All predications have an intrinsic variable * Every intrinsic variable belongs one predication and maybe one quantifier * Every predication has no more than one quantifier * All predications have a label * The graph of predications form a net (i.e. are connected). Connectivity can be established with variable arguments, QEQs, or label-equality. * The lo-handle for each QEQ must exist as the label of a predication """ errors = [] ivs, bvs = {}, {} _vars = self._vars _hcons = self._hcons labels = defaultdict(set) # ep_args = {} for ep in self.eps(): nid, lbl, args, is_q = ( ep.nodeid, ep.label, ep.args, ep.is_quantifier() ) if lbl is None: errors.append('EP ({}) is missing a label.'.format(nid)) labels[lbl].add(nid) iv = args.get(IVARG_ROLE) if iv is None: errors.append('EP {nid} is missing an intrinsic variable.' .format(nid)) if is_q: if iv in bvs: errors.append('{} is the bound variable for more than ' 'one quantifier.'.format(iv)) bvs[iv] = nid else: if iv in ivs: errors.append('{} is the intrinsic variable for more ' 'than one EP.'.format(iv)) ivs[iv] = nid # ep_args[nid] = args for hc in _hcons.values(): if hc[2] not in labels: errors.append('Lo variable of HCONS ({} {} {}) is not the ' 'label of any EP.'.format(*hc)) if not self.is_connected(): errors.append('Xmrs structure is not connected.') if errors: raise XmrsError('\n'.join(errors))
[docs]class Mrs(Xmrs): """ Construct an :class:`Xmrs` using MRS components. Formally, Minimal Recursion Semantics (MRS) have a top handle, a bag of Elementary Predications, and a bag of Handle Constraints. All arguments, including intrinsic arguments and constant arguments, are expected to be contained by the EPs. Args: top: the TOP (or LTOP) variable index: the INDEX variable xarg: the XARG variable rels: an iterable of ElementaryPredications hcons: an iterable of HandleConstraints icons: an iterable of IndividualConstraints lnk: the Lnk object associating the MRS to the surface form surface: the surface string identifier: a discourse-utterance id vars: a mapping of variables to a list of (property, value) pairs Example: >>> m = Mrs( >>> top='h0', >>> index='e2', >>> rels=[ElementaryPredication( >>> Pred.surface('_rain_v_1_rel'), >>> label='h1', >>> args={'ARG0': 'e2'}, >>> vars={'e2': {'SF': 'prop-or-ques', 'TENSE': 'present'}} >>> )], >>> hcons=[HandleConstraint('h0', 'qeq', 'h1')] >>> ) """ def __init__( self, top=None, index=None, xarg=None, rels=None, hcons=None, icons=None, lnk=None, surface=None, identifier=None, vars=None): eps = list(rels or []) hcons = list(hcons or []) icons = list(icons or []) if vars is None: vars = {} # first give eps a nodeid (this is propagated to args) next_nodeid = FIRST_NODEID for ep in eps: if ep.nodeid is not None and ep.nodeid >= next_nodeid: next_nodeid = ep.nodeid + 1 eps_ = [] for i, ep in enumerate(eps): if ep.nodeid is None: eps_.append(tuple([next_nodeid + i] + list(ep[1:]))) else: eps_.append(ep) super(Mrs, self).__init__( top=top, index=index, xarg=xarg, eps=eps_, hcons=hcons, icons=icons, vars=vars, lnk=lnk, surface=surface, identifier=identifier )
[docs] def to_dict(self, short_pred=True, properties=True): """ Encode the Mrs as a dictionary suitable for JSON serialization. """ def _lnk(obj): return {'from': obj.cfrom, 'to': obj.cto} def _ep(ep, short_pred=True): p = ep.pred.short_form() if short_pred else ep.pred.string d = dict(label=ep.label, predicate=p, arguments=ep.args) if ep.lnk is not None: d['lnk'] = _lnk(ep) return d def _hcons(hc): return {'relation':hc[1], 'high':hc[0], 'low':hc[2]} def _icons(ic): return {'relation':ic[1], 'left':ic[0], 'right':ic[2]} def _var(v): d = {'type': var_sort(v)} if properties and self.properties(v): d['properties'] = self.properties(v) return d d = dict( relations=[_ep(ep, short_pred=short_pred) for ep in self.eps()], constraints=([_hcons(hc) for hc in self.hcons()] + [_icons(ic) for ic in self.icons()]), variables={v: _var(v) for v in self.variables()} ) if self.top is not None: d['top'] = self.top if self.index is not None: d['index'] = self.index # if self.xarg is not None: d['xarg'] = self.xarg # if self.lnk is not None: d['lnk'] = self.lnk # if self.surface is not None: d['surface'] = self.surface # if self.identifier is not None: d['identifier'] = self.identifier return d
[docs] @classmethod def from_dict(cls, d): """ Decode a dictionary, as from :meth:`to_dict`, into an Mrs object. """ def _lnk(o): return None if o is None else Lnk.charspan(o['from'], o['to']) def _ep(ep): return ElementaryPredication( nodeid=None, pred=Pred.surface_or_abstract(ep['predicate']), label=ep['label'], args=ep.get('arguments', {}), lnk=_lnk(ep.get('lnk')), surface=ep.get('surface'), base=ep.get('base') ) eps = [_ep(rel) for rel in d.get('relations', [])] hcons = [(c['high'], c['relation'], c['low']) for c in d.get('constraints', []) if 'high' in c] icons = [(c['high'], c['relation'], c['low']) for c in d.get('constraints', []) if 'left' in c] variables = {var: list(data.get('properties', {}).items()) for var, data in d.get('variables', {}).items()} return cls( top=d.get('top'), index=d.get('index'), xarg=d.get('xarg'), rels=eps, hcons=hcons, icons=icons, lnk=_lnk(d.get('lnk')), surface=d.get('surface'), identifier=d.get('identifier'), vars=variables )
[docs]def Rmrs(top=None, index=None, xarg=None, eps=None, args=None, hcons=None, icons=None, lnk=None, surface=None, identifier=None, vars=None): """ Construct an :class:`Xmrs` from RMRS components. Robust Minimal Recursion Semantics (RMRS) are like MRS, but all predications have a nodeid ("anchor"), and arguments are not contained by the source predications, but instead reference the nodeid of their predication. Args: top: the TOP (or maybe LTOP) variable index: the INDEX variable xarg: the XARG variable eps: an iterable of EPs args: a nested mapping of `{nodeid: {rargname: value}}` hcons: an iterable of HandleConstraint objects icons: an iterable of IndividualConstraint objects lnk: the Lnk object associating the MRS to the surface form surface: the surface string identifier: a discourse-utterance id vars: a mapping of variables to a list of `(property, value)` pairs Example: >>> m = Rmrs( >>> top='h0', >>> index='e2', >>> eps=[ElementaryPredication( >>> 10000, >>> Pred.surface('_rain_v_1_rel'), >>> 'h1' >>> )], >>> args={10000: {'ARG0': 'e2'}}, >>> hcons=[HandleConstraint('h0', 'qeq', 'h1'), >>> vars={'e2': {'SF': 'prop-or-ques', 'TENSE': 'present'}} >>> ) """ eps = list(eps or []) args = list(args or []) if vars is None: vars = {} for arg in args: if arg.nodeid is None: raise XmrsStructureError("RMRS args must have a nodeid.") # make the EPs more MRS-like (with arguments) for ep in eps: if ep.nodeid is None: raise XmrsStructureError("RMRS EPs must have a nodeid.") epargs = ep.args for rargname, value in args.get(ep.nodeid, {}).items(): epargs[rargname] = value hcons = list(hcons or []) icons = list(icons or []) return Xmrs(top=top, index=index, xarg=xarg, eps=eps, hcons=hcons, icons=icons, vars=vars, lnk=lnk, surface=surface, identifier=identifier)
[docs]class Dmrs(Xmrs): """ Construct an :class:`Xmrs` using DMRS components. Dependency Minimal Recursion Semantics (DMRS) have a list of Node objects and a list of Link objects. There are no variables or handles, so these will need to be created in order to make an Xmrs object. The *top* node may be set directly via a parameter or may be implicitly set via a Link from the special nodeid 0. If both are given, the link is ignored. The *index* and *xarg* nodes may only be set via parameters. Args: nodes: an iterable of Node objects links: an iterable of Link objects top: the scopal top node index: the non-scopal top node xarg: the external argument node lnk: the Lnk object associating the MRS to the surface form surface: the surface string identifier: a discourse-utterance id Example: >>> rain = Node(10000, Pred.surface('_rain_v_1_rel'), >>> sortinfo={'cvarsort': 'e'}) >>> ltop_link = Link(0, 10000, post='H') >>> d = Dmrs([rain], [ltop_link]) """ def __init__( self, nodes=None, links=None, top=None, index=None, xarg=None, lnk=None, surface=None, identifier=None): if nodes is None: nodes = [] if links is None: links = [] qeq = HandleConstraint.qeq vgen = _VarGenerator() # check this here to streamline things later if top is not None: links = [Link(LTOP_NODEID, top, None, H_POST)] + list(links) top = None labels = _make_labels(nodes, links, vgen) qs = set(l.start for l in links if (l.rargname or '').upper() == RSTR_ROLE) ivs = _make_ivs(nodes, vgen, qs) # initialize args with ARG0 for intrinsic variables args = {nid: {IVARG_ROLE: iv} for nid, iv in ivs.items()} hcons = [] for l in links: if l.start not in args: args[l.start] = {} if safe_int(l.start) != LTOP_NODEID: if not l.rargname or l.rargname.upper() == BARE_EQ_ROLE: continue # don't make an argument for bare EQ links if l.post == H_POST: hole = vgen.new(HANDLESORT)[0] hcons += [qeq(hole, labels[l.end])] args[l.start][l.rargname] = hole # if the arg is RSTR, it's a quantifier, so we can # find its intrinsic variable now if l.rargname.upper() == RSTR_ROLE: ivs[l.start] = ivs[l.end] args[l.start][IVARG_ROLE] = ivs[l.start] elif l.post == HEQ_POST: args[l.start][l.rargname] = labels[l.end] else: # NEQ_POST or EQ_POST args[l.start][l.rargname] = ivs[l.end] # ignore top link if top is already set elif top is None: # The most explicit value of post for a link that denotes a # TOP that is qeq to a label is H_POST, but I equally accept # NIL_POST for backward compatibility. HEQ_POST denotes a TOP # that selects a label directly (and this label equality would # have been captured earlier) top = labels[l.start] if l.post == H_POST or l.post == NIL_POST: hcons += [qeq(top, labels[l.end])] eps = [] for node in nodes: nid = node.nodeid if node.carg is not None: args[nid][CONSTARG_ROLE] = node.carg ep = (nid, node.pred, labels[nid], args[nid], node.lnk, node.surface, node.base) eps.append(ep) icons = None # future feature super(Dmrs, self).__init__( top=top, index=ivs.get(index), xarg=ivs.get(xarg), eps=eps, hcons=hcons, icons=icons, vars=vgen.store, lnk=lnk, surface=surface, identifier=identifier )
[docs] def to_dict(self, short_pred=True, properties=True): """ Encode the Dmrs as a dictionary suitable for JSON serialization. """ qs = set(self.nodeids(quantifier=True)) def _lnk(obj): return {'from': obj.cfrom, 'to': obj.cto} def _node(node, short_pred=True): p = node.pred.short_form() if short_pred else node.pred.string d = dict(nodeid=node.nodeid, predicate=p) if node.lnk is not None: d['lnk'] = _lnk(node) if properties and node.sortinfo: if node.nodeid not in qs: d['sortinfo'] = node.sortinfo if node.surface is not None: d['surface'] = node.surface if node.base is not None: d['base'] = node.base if node.carg is not None: d['carg'] = node.carg return d def _link(link): return { 'from': link.start, 'to': link.end, 'rargname': link.rargname, 'post': link.post } d = dict( nodes=[_node(n) for n in nodes(self)], links=[_link(l) for l in links(self)] ) # if self.top is not None: ... currently handled by links if self.index is not None: idx = self.nodeid(self.index) if idx is not None: d['index'] = idx if self.xarg is not None: xarg = self.nodeid(self.index) if xarg is not None: d['index'] = xarg if self.lnk is not None: d['lnk'] = _lnk(self) if self.surface is not None: d['surface'] = self.surface if self.identifier is not None: d['identifier'] = self.identifier return d
[docs] @classmethod def from_dict(cls, d): """ Decode a dictionary, as from :meth:`to_dict`, into a Dmrs object. """ def _node(obj): return Node( obj.get('nodeid'), Pred.surface_or_abstract(obj.get('predicate')), sortinfo=obj.get('sortinfo'), lnk=_lnk(obj.get('lnk')), surface=obj.get('surface'), base=obj.get('base'), carg=obj.get('carg') ) def _link(obj): return Link(obj.get('from'), obj.get('to'), obj.get('rargname'), obj.get('post')) def _lnk(o): return None if o is None else Lnk.charspan(o['from'], o['to']) return cls( nodes=[_node(n) for n in d.get('nodes', [])], links=[_link(l) for l in d.get('links', [])], lnk=_lnk(d.get('lnk')), surface=d.get('surface'), identifier=d.get('identifier') )
[docs] def to_triples(self, short_pred=True, properties=True): """ Encode the Dmrs as triples suitable for PENMAN serialization. """ ts = [] qs = set(self.nodeids(quantifier=True)) for n in nodes(self): pred = n.pred.short_form() if short_pred else n.pred.string ts.append((n.nodeid, 'predicate', pred)) if n.lnk is not None: ts.append((n.nodeid, 'lnk', '"{}"'.format(str(n.lnk)))) if n.carg is not None: ts.append((n.nodeid, 'carg', '"{}"'.format(n.carg))) if properties and n.nodeid not in qs: for key, value in n.sortinfo.items(): ts.append((n.nodeid, key.lower(), value)) for l in links(self): if safe_int(l.start) == LTOP_NODEID: ts.append((l.start, 'top', l.end)) else: relation = '{}-{}'.format(l.rargname.upper(), l.post) ts.append((l.start, relation, l.end)) return ts
[docs] @classmethod def from_triples(cls, triples, remap_nodeids=True): """ Decode triples, as from :meth:`to_triples`, into a Dmrs object. """ top_nid = str(LTOP_NODEID) top = lnk = surface = identifier = None nids, nd, edges = [], {}, [] for src, rel, tgt in triples: src, tgt = str(src), str(tgt) # hack for int-converted src/tgt if src == top_nid and rel == 'top': top = tgt continue elif src not in nd: if top is None: top=src nids.append(src) nd[src] = {'pred': None, 'lnk': None, 'carg': None, 'si': []} if rel == 'predicate': nd[src]['pred'] = Pred.surface_or_abstract(tgt) elif rel == 'lnk': cfrom, cto = tgt.strip('"<>').split(':') nd[src]['lnk'] = Lnk.charspan(int(cfrom), int(cto)) elif rel == 'carg': if (tgt[0], tgt[-1]) == ('"', '"'): tgt = tgt[1:-1] nd[src]['carg'] = tgt elif rel.islower(): nd[src]['si'].append((rel, tgt)) else: rargname, post = rel.rsplit('-', 1) edges.append((src, tgt, rargname, post)) if remap_nodeids: nidmap = dict((nid, FIRST_NODEID+i) for i, nid in enumerate(nids)) else: nidmap = dict((nid, nid) for nid in nids) nodes = [ Node( nodeid=nidmap[nid], pred=nd[nid]['pred'], sortinfo=nd[nid]['si'], lnk=nd[nid]['lnk'], carg=nd[nid]['carg'] ) for i, nid in enumerate(nids) ] links = [Link(nidmap[s], nidmap[t], r, p) for s, t, r, p in edges] if top: links.append(Link(LTOP_NODEID, nidmap[top], None, H_POST)) return cls( nodes=nodes, links=links, lnk=lnk, surface=surface, identifier=identifier )
def _make_labels(nodes, links, vgen): nids = [node.nodeid for node in nodes] edges = [] for l in links: if safe_int(l.start) == LTOP_NODEID: nids = [l.start] + nids vgen.vid = 0 # start at h0 for TOP if l.post == EQ_POST: edges.append((l.start, l.end)) labels = {} # components return in order of nids for component in _connected_components(nids, edges): lbl = vgen.new(HANDLESORT)[0] for nid in component: labels[nid] = lbl return labels def _make_ivs(nodes, vgen, qs): ivs = {} for node in nodes: # quantifiers share their IV with the quantifiee. It will be # selected later during argument construction if node.nodeid not in qs: props = dict((key, val) for key, val in node.sortinfo.items() if key != CVARSORT) ivs[node.nodeid] = vgen.new(node.cvarsort, props)[0] return ivs def _ivs_in_scope(nodeid, _eps, _vars, _hcons): ivs = set() args = _eps[nodeid][3] for role, val in args.items(): if role == IVARG_ROLE: ivs.add(val) elif role == CONSTARG_ROLE: pass elif var_sort(val) == HANDLESORT: if val in _hcons: val = _hcons[val].lo for conj_nid in _vars[val]['refs']['LBL']: ivs.update(_ivs_in_scope(conj_nid, _eps, _vars, _hcons)) return ivs