# coding: utf-8
"""
Regular Expression Preprocessor (REPP)
"""
import re
from sre_parse import parse_template
from pathlib import Path
from array import array
from collections import namedtuple
from delphin.tokens import YYToken, YYTokenLattice
from delphin.lnk import Lnk
from delphin.exceptions import PyDelphinException
# Default modules need to import the PyDelphin version
from delphin.__about__ import __version__ # noqa: F401
[docs]class REPPError(PyDelphinException):
"""Raised when there is an error in tokenizing with REPP."""
[docs]class REPPStep(namedtuple(
'REPPStep', 'input output operation applied startmap endmap')):
"""
A single rule application in REPP.
Attributes:
input (str): input string (prior to application)
output (str): output string (after application)
operation: operation performed
applied (bool): `True` if the rule was applied
startmap (:py:class:`array`): integer array of start offsets
endmap (:py:class:`array`): integer array of end offsets
"""
[docs]class REPPResult(namedtuple(
'REPPResult', 'string startmap endmap')):
"""
The final result of REPP application.
Attributes:
string (str): resulting string after all rules have applied
startmap (:py:class:`array`): integer array of start offsets
endmap (:py:class:`array`): integer array of end offsets
"""
class _REPPOperation(object):
"""
The supertype of REPP groups and rules.
This class defines the apply(), trace(), and tokenize() methods
which are available in [_REPPRule], [_REPPGroup],
[_REPPIterativeGroup], and [REPP] instances.
"""
def _apply(self, s, active):
raise NotImplementedError()
def apply(self, s, active=None):
for step in self.trace(s, active=active):
pass
return step
def trace(self, s, active=None, verbose=False):
startmap = _zeromap(s)
endmap = _zeromap(s)
# initial boundaries
startmap[0] = 1
endmap[-1] = -1
step = None
for step in self._apply(s, active):
if step.applied or verbose:
yield step
if step.applied:
startmap = _mergemap(startmap, step.startmap)
endmap = _mergemap(endmap, step.endmap)
if step is not None:
s = step.output
yield REPPResult(s, startmap, endmap)
def tokenize(self, s, pattern=r'[ \t]+', active=None):
res = self.apply(s, active=active)
tokens = [
YYToken(id=i, start=i, end=i+1,
lnk=Lnk.charspan(tok[0], tok[1]),
form=tok[2])
for i, tok in enumerate(_tokenize(res, pattern))
]
return YYTokenLattice(tokens)
class _REPPRule(_REPPOperation):
"""
A REPP rewrite rule.
The apply() method of this class works like re.sub() in Python's
standard library, but it analyzes the replacement pattern in order
to ensure that character positions in the resulting string can be
traced back (as much as possible) to the original string.
Args:
pattern: the regular expression pattern to match
replacement: the replacement template
"""
def __init__(self, pattern, replacement):
self.pattern = pattern
self.replacement = replacement
self._re = re.compile(pattern)
groups, literals = parse_template(replacement, self._re)
# if a literal is None then it has a group, so make this
# easier to iterate over by making pairs of (literal, None) or
# (None, group)
group_map = dict(groups)
self._segments = [(literal, group_map.get(i))
for i, literal in enumerate(literals)]
# Get "trackable" capture groups; i.e., those that are
# transparent for characterization. For PET behavior, these
# must appear in strictly increasing order with no gaps
self._last_trackable = -1 # index of trackable segment, not group id
last_trackable_group = 0
for i, group in groups:
if group == last_trackable_group + 1:
self._last_trackable = i
last_trackable_group = group
else:
break
def __str__(self):
return '!{}\t\t{}'.format(self.pattern, self.replacement)
def _apply(self, s, active):
ms = list(self._re.finditer(s))
if ms:
pos = 0 # current position in the original string
shift = 0 # current original/target length difference
parts = []
smap = array('i', [0])
emap = array('i', [0])
for m in ms:
start = m.start()
if pos < start:
_copy_part(s[pos:start], shift, parts, smap, emap)
for literal, start, end, tracked in self._itersegments(m):
if tracked:
_copy_part(literal, shift, parts, smap, emap)
else:
width = end - start
_insert_part(literal, width, shift, parts, smap, emap)
shift += width - len(literal)
pos = m.end()
if pos < len(s):
_copy_part(s[pos:], shift, parts, smap, emap)
smap.append(shift)
emap.append(shift - 1)
o = ''.join(parts)
applied = True
else:
o = s
smap = _zeromap(o)
emap = _zeromap(o)
applied = False
yield REPPStep(s, o, self, applied, smap, emap)
def _itermatches(self, ms):
"""Yield pairs of the last affected position and a match."""
last_pos = 0
for m in ms:
yield (last_pos, m)
last_pos = m.end()
def _itersegments(self, m):
"""Yield tuples of (replacement, start, end, tracked)."""
start = m.start()
# first yield segments that might be trackable
tracked = self._segments[:self._last_trackable + 1]
if tracked:
spans = {group: m.span(group)
for literal, group in tracked
if literal is None}
end = m.start(1) # if literal before tracked group
for literal, group in tracked:
if literal is None:
start, end = spans[group]
yield (m.group(group), start, end, True)
start = end
if group + 1 in spans:
end = spans[group + 1][0]
else:
yield (literal, start, end, False)
# then group all remaining segments together
remaining = self._segments[self._last_trackable + 1:]
if remaining:
literal = ''.join(
m.group(group) if literal is None else literal
for literal, group in remaining)
yield (literal, start, m.end(), False)
class _REPPGroup(_REPPOperation):
def __init__(self, operations=None, name=None):
if operations is None:
operations = []
self.operations = operations
self.name = name
def __repr__(self):
name = '("{}") '.format(self.name) if self.name is not None else ''
return '<{} object {}at {}>'.format(
self.__class__.__name__, name, id(self)
)
def __str__(self):
return 'Module {}'.format(self.name if self.name is not None else '')
def _apply(self, s, active):
o = s
applied = False
for operation in self.operations:
for step in operation._apply(o, active):
yield step
o = step.output
applied |= step.applied
yield REPPStep(s, o, self, applied, _zeromap(o), _zeromap(o))
class _REPPGroupCall(_REPPOperation):
def __init__(self, name, modules):
self.name = name
self.modules = modules
def _apply(self, s, active):
if active is not None and self.name in active:
yield from self.modules[self.name]._apply(s, active)
class _REPPIterativeGroup(_REPPGroup):
def __str__(self):
return 'Internal group #{}'.format(self.name)
def _apply(self, s, active):
o = s
applied = False
prev = None
while prev != o:
prev = o
for operation in self.operations:
for step in operation._apply(o, active):
yield step
o = step.output
applied |= step.applied
yield REPPStep(s, o, self, applied, _zeromap(o), _zeromap(o))
[docs]class REPP(object):
"""
A Regular Expression Pre-Processor (REPP).
The normal way to create a new REPP is to read a .rpp file via the
:meth:`from_file` classmethod. For REPPs that are defined in code,
there is the :meth:`from_string` classmethod, which parses the same
definitions but does not require file I/O. Both methods, as does
the class's `__init__()` method, allow for pre-loaded and named
external *modules* to be provided, which allow for external group
calls (also see :meth:`from_file` or implicit module loading). By
default, all external submodules are deactivated, but they can be
activated by adding the module names to *active* or, later, via the
:meth:`activate` method.
A third classmethod, :meth:`from_config`, reads a PET-style
configuration file (e.g., `repp.set`) which may specify the
available and active modules, and therefore does not take the
*modules* and *active* parameters.
Args:
name (str, optional): the name assigned to this module
modules (dict, optional): a mapping from identifiers to REPP
modules
active (iterable, optional): an iterable of default module
activations
"""
def __init__(self, name=None, modules=None, active=None):
self.info = None
self.tokenize_pattern = None
self.group = _REPPGroup(name=name)
if modules is None:
modules = []
self.modules = dict(modules)
self.active = set()
if active is None:
active = []
for mod in active:
self.activate(mod)
[docs] @classmethod
def from_config(cls, path, directory=None):
"""
Instantiate a REPP from a PET-style `.set` configuration file.
The *path* parameter points to the configuration file.
Submodules are loaded from *directory*. If *directory* is not
given, it is the directory part of *path*.
Args:
path (str): the path to the REPP configuration file
directory (str, optional): the directory in which to search
for submodules
"""
path = Path(path).expanduser()
if not path.is_file():
raise REPPError('REPP config file not found: {!s}'.format(path))
confdir = path.parent
# TODO: can TDL parsing be repurposed for this variant?
conf = path.read_text(encoding='utf-8')
conf = re.sub(r';.*', '', conf).replace('\n', ' ')
m = re.search(
r'repp-modules\s*:=\s*((?:[-\w]+\s+)*[-\w]+)\s*\.', conf)
t = re.search(
r'repp-tokenizer\s*:=\s*([-\w]+)\s*\.', conf)
a = re.search(
r'repp-calls\s*:=\s*((?:[-\w]+\s+)*[-\w]+)\s*\.', conf)
# f = re.search(
# r'format\s*:=\s*(\w+)\s*\.', conf)
d = re.search(
r'repp-directory\s*:=\s*(.*)\.\s*$', conf)
if m is None:
raise REPPError('repp-modules option must be set')
if t is None:
raise REPPError('repp-tokenizer option must be set')
# mods = m.group(1).split()
tok = t.group(1).strip()
active = a.group(1).split() if a is not None else None
# fmt = f.group(1).strip() if f is not None else None
if directory is None:
if d is not None:
directory = d.group(1).strip(' "')
elif confdir.joinpath(tok + '.rpp').is_file():
directory = confdir
elif confdir.joinpath('rpp', tok + '.rpp').is_file():
directory = confdir.joinpath('rpp')
elif confdir.joinpath('../rpp', tok + '.rpp').is_file():
directory = confdir.joinpath('../rpp')
else:
raise REPPError('Could not find a suitable REPP directory.')
# ignore repp-modules and format?
return REPP.from_file(
directory.joinpath(tok + '.rpp'),
directory=directory,
active=active
)
[docs] @classmethod
def from_file(cls, path, directory=None, modules=None, active=None):
"""
Instantiate a REPP from a `.rpp` file.
The *path* parameter points to the top-level module. Submodules
are loaded from *directory*. If *directory* is not given, it is
the directory part of *path*.
A REPP module may utilize external submodules, which may be
defined in two ways. The first method is to map a module name
to an instantiated REPP instance in *modules*. The second
method assumes that an external group call `>abc` corresponds
to a file `abc.rpp` in *directory* and loads that file. The
second method only happens if the name (e.g., `abc`) does not
appear in *modules*. Only one module may define a tokenization
pattern.
Args:
path (str): the path to the base REPP file to load
directory (str, optional): the directory in which to search
for submodules
modules (dict, optional): a mapping from identifiers to
REPP modules
active (iterable, optional): an iterable of default module
activations
"""
path = Path(path).expanduser()
if directory is not None:
directory = Path(directory).expanduser()
else:
directory = path.parent
name = path.with_suffix('').name
lines = _repp_lines(path)
r = cls(name=name, modules=modules, active=active)
_parse_repp(lines, r, directory)
return r
[docs] @classmethod
def from_string(cls, s, name=None, modules=None, active=None):
"""
Instantiate a REPP from a string.
Args:
name (str, optional): the name of the REPP module
modules (dict, optional): a mapping from identifiers to
REPP modules
active (iterable, optional): an iterable of default module
activations
"""
r = cls(name=name, modules=modules, active=active)
_parse_repp(s.splitlines(), r, None)
return r
[docs] def activate(self, mod):
"""
Set external module *mod* to active.
"""
self.active.add(mod)
[docs] def deactivate(self, mod):
"""
Set external module *mod* to inactive.
"""
if mod in self.active:
self.active.remove(mod)
def _apply(self, s, active):
return self.group._apply(s, active)
[docs] def apply(self, s, active=None):
"""
Apply the REPP's rewrite rules to the input string *s*.
Args:
s (str): the input string to process
active (optional): a collection of external module names
that may be applied if called
Returns:
a :class:`REPPResult` object containing the processed
string and characterization maps
"""
if active is None:
active = self.active
return self.group.apply(s, active=active)
[docs] def trace(self, s, active=None, verbose=False):
"""
Rewrite string *s* like `apply()`, but yield each rewrite step.
Args:
s (str): the input string to process
active (optional): a collection of external module names
that may be applied if called
verbose (bool, optional): if `False`, only output rules or
groups that matched the input
Yields:
a :class:`REPPStep` object for each intermediate rewrite
step, and finally a :class:`REPPResult` object after
the last rewrite
"""
if active is None:
active = self.active
return self.group.trace(s, active=active, verbose=verbose)
[docs] def tokenize(self, s, pattern=None, active=None):
"""
Rewrite and tokenize the input string *s*.
Args:
s (str): the input string to process
pattern (str, optional): the regular expression pattern on
which to split tokens; defaults to `[ \t]+`
active (optional): a collection of external module names
that may be applied if called
Returns:
a :class:`~delphin.tokens.YYTokenLattice` containing the
tokens and their characterization information
"""
if pattern is None:
if self.tokenize_pattern is None:
pattern = r'[ \t]+'
else:
pattern = self.tokenize_pattern
if active is None:
active = self.active
return self.group.tokenize(s, pattern=pattern, active=active)
def _zeromap(s):
return array('i', [0] * (len(s) + 2))
def _mergemap(map1, map2):
"""
Positions in map2 have an integer indicating the relative shift to
the equivalent position in map1. E.g., the i'th position in map2
corresponds to the i + map2[i] position in map1.
"""
merged = array('i', [0] * len(map2))
for i, shift in enumerate(map2):
newshift = shift + map1[i + shift]
merged[i] = newshift
return merged
def _copy_part(s, shift, parts, smap, emap):
parts.append(s)
smap.extend([shift] * len(s))
emap.extend([shift] * len(s))
def _insert_part(s, w, shift, parts, smap, emap):
parts.append(s)
a = shift
b = a - len(s)
smap.extend(range(a, b, -1))
a = shift + w - 1
b = a - len(s)
emap.extend(range(a, b, -1))
def _tokenize(result, pattern):
s, sm, em = result # unpack for efficiency in loop
toks = []
pos = 0
for m in re.finditer(pattern, result.string):
if pos < m.start():
toks.append((pos + sm[pos + 1],
m.start() + em[m.start()],
s[pos:m.start()]))
pos = m.end()
if pos < len(s):
toks.append((pos + sm[pos + 1],
len(s) + em[len(s)],
s[pos:]))
return toks
def _repp_lines(path):
if not path.is_file():
raise REPPError('REPP file not found: {!s}'.format(path))
return path.read_text(encoding='utf-8').splitlines()
def _parse_repp(lines, r, directory):
ops = list(_parse_repp_group(lines, r, directory))
if lines:
raise REPPError('Unexpected termination; maybe the # operator '
'appeared without an internal group.')
r.group.operations.extend(ops)
def _parse_repp_group(lines, r, directory):
igs = {} # internal groups
while lines:
line = lines.pop(0)
if line.startswith(';') or line.strip() == '':
continue # skip comments and empty lines
elif line[0] == '!':
match = re.match(r'([^\t]+)\t+(.*)', line[1:])
if match is None:
raise REPPError('Invalid rewrite rule: {}'.format(line))
yield _REPPRule(match.group(1), match.group(2))
elif line[0] == '<':
fn = directory.joinpath(line[1:].rstrip())
lines = _repp_lines(fn) + lines
elif line[0] == '>':
modname = line[1:].rstrip()
if modname.isdigit():
if modname in igs:
yield igs[modname]
else:
raise REPPError(
'Iterative group not defined: ' + modname
)
else:
if modname not in r.modules:
if directory is None:
raise REPPError('Cannot implicitly load modules if '
'a directory is not given.')
mod = REPP.from_file(
directory.joinpath(modname + '.rpp'),
directory=directory,
modules=r.modules
)
r.modules[modname] = mod
yield _REPPGroupCall(modname, r.modules)
elif line[0] == '#':
igname = line[1:].rstrip()
if igname.isdigit():
if igname in igs:
raise REPPError(
'Internal group name already defined: ' + igname
)
igs[igname] = _REPPIterativeGroup(
operations=list(
_parse_repp_group(lines, r, directory)
),
name=igname
)
elif igname == '':
return
else:
raise REPPError('Invalid internal group name: ' + igname)
elif line[0] == ':':
if r.tokenize_pattern is not None:
raise REPPError(
'Only one tokenization pattern (:) may be defined.'
)
r.tokenize_pattern = line[1:]
elif line[0] == '@':
if r.info is not None:
raise REPPError(
'No more than one meta-info declaration (@) may be '
'defined.'
)
r.info = line[1:]
else:
raise REPPError('Invalid declaration: {}'.format(line))