# coding: utf-8
"""
Regular Expression Preprocessor (REPP)
A Regular-Expression Preprocessor [REPP]_ is a method of applying a
system of regular expressions for transformation and tokenization while
retaining character indices from the original input string.
.. [REPP] Rebecca Dridan and Stephan Oepen. Tokenization: Returning to
a long solved problem---a survey, contrastive experiment,
recommendations, and toolkit. In Proceedings of the 50th Annual
Meeting of the Association for Computational Linguistics (Volume 2:
Short Papers), pages 378–382, Jeju Island, Korea, July 2012.
Association for Computational Linguistics.
URL http://www.aclweb.org/anthology/P12-2074.
"""
from __future__ import unicode_literals
from os.path import exists, dirname, basename, join as joinpath
import io
import warnings
import re
from sre_parse import parse_template
from array import array
from collections import namedtuple
from delphin.tokens import YyToken, YyTokenLattice
from delphin.mrs.components import Lnk
from delphin.exceptions import REPPError
[docs]class REPPStep(namedtuple(
'REPPStep', 'input output operation applied startmap endmap')):
"""
A single rule application in REPP.
Attributes:
input (str): input string (prior to application)
output (str): output string (after application)
operation: operation performed
applied (bool): `True` if the rule was applied
startmap (:py:class:`array`): integer array of start offsets
endmap (:py:class:`array`): integer array of end offsets
"""
[docs]class REPPResult(namedtuple(
'REPPResult', 'string startmap endmap')):
"""
The final result of REPP application.
Attributes:
string (str): resulting string after all rules have applied
startmap (:py:class:`array`): integer array of start offsets
endmap (:py:class:`array`): integer array of end offsets
"""
class _REPPOperation(object):
"""
The supertype of REPP groups and rules.
This class defines the apply(), trace(), and tokenize() methods
which are available in [_REPPRule], [_REPPGroup],
[_REPPIterativeGroup], and [REPP] instances.
"""
def _apply(self, s, active):
raise NotImplementedError()
def apply(self, s, active=None):
for step in self.trace(s, active=active):
pass
return step
def trace(self, s, active=None, verbose=False):
startmap = _zeromap(s)
endmap = _zeromap(s)
# initial boundaries
startmap[0] = 1
endmap[-1] = -1
step = None
for step in self._apply(s, active):
if step.applied or verbose:
yield step
startmap = _mergemap(startmap, step.startmap)
endmap = _mergemap(endmap, step.endmap)
if step is not None:
s = step.output
yield REPPResult(s, startmap, endmap)
def tokenize(self, s, pattern=r'[ \t]+', active=None):
res = self.apply(s, active=active)
tokens = [
YyToken(id=i, start=i, end=i+1,
lnk=Lnk.charspan(tok[0], tok[1]),
form=tok[2])
for i, tok in enumerate(_tokenize(res, pattern))
]
return YyTokenLattice(tokens)
class _REPPRule(_REPPOperation):
"""
A REPP rewrite rule.
The apply() method of this class works like re.sub() in Python's
standard library, but it analyzes the replacement pattern in order
to ensure that character positions in the resulting string can be
traced back (as much as possible) to the original string.
Args:
pattern: the regular expression pattern to match
replacement: the replacement template
"""
def __init__(self, pattern, replacement):
self.pattern = pattern
self.replacement = replacement
self._re = re.compile(pattern)
backrefs, literals = parse_template(replacement, self._re)
self._backrefs = dict(backrefs)
self._segments = literals
# Get "trackable" capture groups; i.e., those that are
# transparent for characterization. For PET behavior, these
# must appear in strictly increasing order with no gaps
self._groupstack = []
for i, backref in enumerate(backrefs):
_, grpidx = backref
if i + 1 != grpidx:
break
self._groupstack.append(grpidx)
def __str__(self):
return '!{}\t\t{}'.format(self.pattern, self.replacement)
def _apply(self, s, active):
ms = list(self._re.finditer(s))
if ms:
pos = 0 # current position in original string
shift = 0 # current original/target length difference
parts = []
smap = array('i', [0])
emap = array('i', [0])
for m in ms:
cgs = list(reversed(self._groupstack)) # make a copy
lb = m.start()
rb = m.start(cgs[-1]) if cgs else m.end()
if pos < m.start():
_copy_part(s[pos:m.start()], shift, parts, smap, emap)
pos = m.start()
for i, seg in enumerate(self._segments):
if seg is None and cgs:
grpidx = cgs.pop()
assert self._backrefs[i] == grpidx
seg = m.group(grpidx)
_copy_part(seg, shift, parts, smap, emap)
# adjust for the next segment
lb = m.end(grpidx)
rb = m.start(cgs[-1]) if cgs else m.end()
pos += len(seg)
else:
if seg is None:
seg = m.group(i)
w = rb - lb # segment width
_insert_part(seg, w, shift, parts, smap, emap)
pos += w
shift = pos - sum(map(len, parts))
if pos < len(s):
_copy_part(s[pos:], shift, parts, smap, emap)
smap.append(shift)
emap.append(shift - 1)
o = ''.join(parts)
applied = True
else:
o = s
smap = _zeromap(o)
emap = _zeromap(o)
applied = False
yield REPPStep(s, o, self, applied, smap, emap)
class _REPPGroup(_REPPOperation):
def __init__(self, operations=None, name=None):
if operations is None: operations = []
self.operations = operations
self.name = name
def __repr__(self):
name = '("{}") '.format(self.name) if self.name is not None else ''
return '<{} object {}at {}>'.format(
self.__class__.__name__, name, id(self)
)
def __str__(self):
return 'Module {}'.format(self.name if self.name is not None else '')
def _apply(self, s, active):
o = s
applied = False
for operation in self.operations:
for step in operation._apply(o, active):
yield step
o = step.output
applied |= step.applied
yield REPPStep(s, o, self, applied, _zeromap(o), _zeromap(o))
class _REPPGroupCall(_REPPOperation):
def __init__(self, name, modules):
self.name = name
self.modules = modules
def _apply(self, s, active):
if active is not None and self.name in active:
# TODO 'yield from' may be useful here when Python2.7 is
# no longer supported. See issue #115
for step in self.modules[self.name]._apply(s, active):
yield step
class _REPPIterativeGroup(_REPPGroup):
def __str__(self):
return 'Internal group #{}'.format(self.name)
def _apply(self, s, active):
o = s
applied = False
prev = None
while prev != o:
prev = o
for operation in self.operations:
for step in operation._apply(o, active):
yield step
o = step.output
applied |= step.applied
yield REPPStep(s, o, self, applied, _zeromap(o), _zeromap(o))
[docs]class REPP(object):
"""
A Regular Expression Pre-Processor (REPP).
The normal way to create a new REPP is to read a .rpp file via the
:meth:`from_file` classmethod. For REPPs that are defined in code,
there is the :meth:`from_string` classmethod, which parses the same
definitions but does not require file I/O. Both methods, as does
the class's `__init__()` method, allow for pre-loaded and named
external *modules* to be provided, which allow for external group
calls (also see :meth:`from_file` or implicit module loading). By
default, all external submodules are deactivated, but they can be
activated by adding the module names to *active* or, later, via the
:meth:`activate` method.
A third classmethod, :meth:`from_config`, reads a PET-style
configuration file (e.g., `repp.set`) which may specify the
available and active modules, and therefore does not take the
*modules* and *active* parameters.
Args:
name (str, optional): the name assigned to this module
modules (dict, optional): a mapping from identifiers to REPP
modules
active (iterable, optional): an iterable of default module
activations
"""
def __init__(self, name=None, modules=None, active=None):
self.info = None
self.tokenize_pattern = None
self.group = _REPPGroup(name=name)
if modules is None:
modules = []
self.modules = dict(modules)
self.active = set()
if active is None:
active = []
for mod in active:
self.activate(mod)
[docs] @classmethod
def from_config(cls, path, directory=None):
"""
Instantiate a REPP from a PET-style `.set` configuration file.
The *path* parameter points to the configuration file.
Submodules are loaded from *directory*. If *directory* is not
given, it is the directory part of *path*.
Args:
path (str): the path to the REPP configuration file
directory (str, optional): the directory in which to search
for submodules
"""
if not exists(path):
raise REPPError('REPP config file not found: {}'.format(path))
confdir = dirname(path)
# TODO: can TDL parsing be repurposed for this variant?
conf = io.open(path, encoding='utf-8').read()
conf = re.sub(r';.*', '', conf).replace('\n',' ')
m = re.search(
r'repp-modules\s*:=\s*((?:[-\w]+\s+)*[-\w]+)\s*\.', conf)
t = re.search(
r'repp-tokenizer\s*:=\s*([-\w]+)\s*\.', conf)
a = re.search(
r'repp-calls\s*:=\s*((?:[-\w]+\s+)*[-\w]+)\s*\.', conf)
f = re.search(
r'format\s*:=\s*(\w+)\s*\.', conf)
d = re.search(
r'repp-directory\s*:=\s*(.*)\.\s*$', conf)
if m is None:
raise REPPError('repp-modules option must be set')
if t is None:
raise REPPError('repp-tokenizer option must be set')
mods = m.group(1).split()
tok = t.group(1).strip()
active = a.group(1).split() if a is not None else None
fmt = f.group(1).strip() if f is not None else None
if directory is None:
if d is not None:
directory = d.group(1).strip(' "')
elif exists(joinpath(confdir, tok + '.rpp')):
directory = confdir
elif exists(joinpath(confdir, 'rpp', tok + '.rpp')):
directory = joinpath(confdir, 'rpp')
elif exists(joinpath(confdir, '../rpp', tok + '.rpp')):
directory = joinpath(confdir, '../rpp')
else:
raise REPPError('Could not find a suitable REPP directory.')
# ignore repp-modules and format?
return REPP.from_file(
joinpath(directory, tok + '.rpp'),
directory=directory,
active=active
)
[docs] @classmethod
def from_file(cls, path, directory=None, modules=None, active=None):
"""
Instantiate a REPP from a `.rpp` file.
The *path* parameter points to the top-level module. Submodules
are loaded from *directory*. If *directory* is not given, it is
the directory part of *path*.
A REPP module may utilize external submodules, which may be
defined in two ways. The first method is to map a module name
to an instantiated REPP instance in *modules*. The second
method assumes that an external group call `>abc` corresponds
to a file `abc.rpp` in *directory* and loads that file. The
second method only happens if the name (e.g., `abc`) does not
appear in *modules*. Only one module may define a tokenization
pattern.
Args:
path (str): the path to the base REPP file to load
directory (str, optional): the directory in which to search
for submodules
modules (dict, optional): a mapping from identifiers to
REPP modules
active (iterable, optional): an iterable of default module
activations
"""
name = basename(path)
if name.endswith('.rpp'):
name = name[:-4]
lines = _repp_lines(path)
directory = dirname(path) if directory is None else directory
r = cls(name=name, modules=modules, active=active)
_parse_repp(lines, r, directory)
return r
[docs] @classmethod
def from_string(cls, s, name=None, modules=None, active=None):
"""
Instantiate a REPP from a string.
Args:
name (str, optional): the name of the REPP module
modules (dict, optional): a mapping from identifiers to
REPP modules
active (iterable, optional): an iterable of default module
activations
"""
r = cls(name=name, modules=modules, active=active)
_parse_repp(s.splitlines(), r, None)
return r
[docs] def activate(self, mod):
"""
Set external module *mod* to active.
"""
self.active.add(mod)
[docs] def deactivate(self, mod):
"""
Set external module *mod* to inactive.
"""
if mod in self.active:
self.active.remove(mod)
def _apply(self, s, active):
return self.group._apply(s, active)
[docs] def apply(self, s, active=None):
"""
Apply the REPP's rewrite rules to the input string *s*.
Args:
s (str): the input string to process
active (optional): a collection of external module names
that may be applied if called
Returns:
a :class:`REPPResult` object containing the processed
string and characterization maps
"""
if active is None:
active = self.active
return self.group.apply(s, active=active)
[docs] def trace(self, s, active=None, verbose=False):
"""
Rewrite string *s* like `apply()`, but yield each rewrite step.
Args:
s (str): the input string to process
active (optional): a collection of external module names
that may be applied if called
verbose (bool, optional): if `False`, only output rules or
groups that matched the input
Yields:
a :class:`REPPStep` object for each intermediate rewrite
step, and finally a :class:`REPPResult` object after
the last rewrite
"""
if active is None:
active = self.active
return self.group.trace(s, active=active, verbose=verbose)
[docs] def tokenize(self, s, pattern=None, active=None):
"""
Rewrite and tokenize the input string *s*.
Args:
s (str): the input string to process
pattern (str, optional): the regular expression pattern on
which to split tokens; defaults to `[ \t]+`
active (optional): a collection of external module names
that may be applied if called
Returns:
a :class:`~delphin.tokens.YyTokenLattice` containing the
tokens and their characterization information
"""
if pattern is None:
if self.tokenize_pattern is None:
pattern = r'[ \t]+'
else:
pattern = self.tokenize_pattern
if active is None:
active = self.active
return self.group.tokenize(s, pattern=pattern, active=active)
def _zeromap(s):
return array('i', [0] * (len(s) + 2))
def _mergemap(map1, map2):
"""
Positions in map2 have an integer indicating the relative shift to
the equivalent position in map1. E.g., the i'th position in map2
corresponds to the i + map2[i] position in map1.
"""
merged = array('i', [0] * len(map2))
for i, shift in enumerate(map2):
merged[i] = shift + map1[i + shift]
return merged
def _copy_part(s, shift, parts, smap, emap):
parts.append(s)
smap.extend([shift] * len(s))
emap.extend([shift] * len(s))
def _insert_part(s, w, shift, parts, smap, emap):
parts.append(s)
smap.extend(range(shift, shift - len(s), -1))
newshift = shift + (w - len(s))
emap.extend(range(shift + w - 1, newshift - 1, -1))
def _tokenize(result, pattern):
s, sm, em = result # unpack for efficiency in loop
toks = []
pos = 0
for m in re.finditer(pattern, result.string):
if pos < m.start():
toks.append((pos + sm[pos + 1],
m.start() + em[m.start()],
s[pos:m.start()]))
pos = m.end()
if pos < len(s):
toks.append((pos + sm[pos + 1],
len(s) + em[len(s)],
s[pos:]))
return toks
def _repp_lines(path):
if not exists(path):
raise REPPError('REPP file not found: {}'.format(path))
return io.open(path, encoding='utf-8').read().splitlines()
def _parse_repp(lines, r, directory):
ops = list(_parse_repp_group(lines, r, directory))
if lines:
raise REPPError('Unexpected termination; maybe the # operator '
'appeared without an internal group.')
r.group.operations.extend(ops)
def _parse_repp_group(lines, r, directory):
igs = {} # internal groups
while lines:
line = lines.pop(0)
if line.startswith(';') or line.strip() == '':
continue # skip comments and empty lines
elif line[0] == '!':
match = re.match(r'([^\t]+)\t+(.*)', line[1:])
if match is None:
raise REPPError('Invalid rewrite rule: {}'.format(line))
yield _REPPRule(match.group(1), match.group(2))
elif line[0] == '<':
fn = joinpath(directory, line[1:].rstrip())
lines = _repp_lines(fn) + lines
elif line[0] == '>':
modname = line[1:].rstrip()
if modname.isdigit():
if modname in igs:
yield igs[modname]
else:
raise REPPError(
'Iterative group not defined: ' + modname
)
else:
if modname not in r.modules:
if directory is None:
raise REPPError('Cannot implicitly load modules if '
'a directory is not given.')
mod = REPP.from_file(
joinpath(directory, modname + '.rpp'),
directory=directory,
modules=r.modules
)
r.modules[modname] = mod
yield _REPPGroupCall(modname, r.modules)
elif line[0] == '#':
igname = line[1:].rstrip()
if igname.isdigit():
if igname in igs:
raise REPPError(
'Internal group name already defined: ' + igname
)
igs[igname] = _REPPIterativeGroup(
operations=list(
_parse_repp_group(lines, r, directory)
),
name=igname
)
elif igname == '':
return
else:
raise REPPError('Invalid internal group name: ' + igname)
elif line[0] == ':':
if r.tokenize_pattern is not None:
raise REPPError(
'Only one tokenization pattern (:) may be defined.'
)
r.tokenize_pattern = line[1:]
elif line[0] == '@':
if r.info is not None:
raise REPPError(
'No more than one meta-info declaration (@) may be '
'defined.'
)
r.info = line[1:]
else:
raise REPPError('Invalid declaration: {}'.format(line))