# -*- coding: utf-8 -*-
"""
Classes and functions for working with [incr tsdb()] profiles.
The `itsdb` module provides classes and functions for working with
[incr tsdb()] profiles (or, more generally, testsuites; see
http://moin.delph-in.net/ItsdbTop). It handles the technical details
of encoding and decoding records in tables, including escaping and
unescaping reserved characters, pairing columns with their relational
descriptions, casting types (such as `:integer`, etc.), and
transparently handling gzipped tables, so that the user has a natural
way of working with the data. Capabilities include:
* Reading and writing testsuites:
>>> from delphin import itsdb
>>> ts = itsdb.TestSuite('jacy/tsdb/gold/mrs')
>>> ts.write(path='mrs-copy')
* Selecting data by table name, record index, and column name or index:
>>> items = ts['item'] # get the items table
>>> rec = items[0] # get the first record
>>> rec['i-input'] # input sentence of the first item
'雨 が 降っ た .'
>>> rec[0] # values are cast on index retrieval
11
>>> rec.get('i-id') # and on key retrieval
11
>>> rec.get('i-id', cast=False) # unless cast=False
'11'
* Selecting data as a query (note that types are cast by default):
>>> next(ts.select('item:i-id@i-input@i-date')) # query testsuite
[11, '雨 が 降っ た .', datetime.datetime(2006, 5, 28, 0, 0)]
>>> next(items.select('i-id@i-input@i-date')) # query table
[11, '雨 が 降っ た .', datetime.datetime(2006, 5, 28, 0, 0)]
* In-memory modification of testsuite data:
>>> # desegment each sentence
>>> for record in ts['item']:
... record['i-input'] = ''.join(record['i-input'].split())
...
>>> ts['item'][0]['i-input']
'雨が降った.'
* Joining tables
>>> joined = itsdb.join(ts['parse'], ts['result'])
>>> next(joined.select('i-id@mrs'))
[11, '[ LTOP: h1 INDEX: e2 [ e TENSE: PAST ...']
* Processing data with ACE (results are stored in memory)
>>> from delphin.interfaces import ace
>>> with ace.AceParser('jacy.dat') as cpu:
... ts.process(cpu)
...
NOTE: parsed 126 / 135 sentences, avg 3167k, time 1.87536s
>>> ts.write('new-profile')
This module covers all aspects of [incr tsdb()] data, from
:class:`Relations` files and :class:`Field` descriptions to
:class:`Record`, :class:`Table`, and full :class:`TestSuite` classes.
:class:`TestSuite` is the most user-facing interface, and it makes it
easy to load the tables of a testsuite into memory, inspect its
contents, modify or create data, and write the data to disk.
By default, the `itsdb` module expects testsuites to use the standard
[incr tsdb()] schema. Testsuites are always read and written according
to the associated or specified relations file, but other things, such
as default field values and the list of "core" tables, are defined for
the standard schema. It is, however, possible to define non-standard
schemata for particular applications, and most functions will continue
to work. One notable exception is the :meth:`TestSuite.process`
method, for which a new :class:`~delphin.interfaces.base.FieldMapper`
class must be defined.
"""
from __future__ import print_function
# TODO: Remove when Python2.7 support is gone
try:
unicode
except NameError:
unicode = str
import os
import re
from gzip import GzipFile, open as gzopen
import tempfile
import shutil
import logging
import io
from io import TextIOWrapper
from collections import (
defaultdict, namedtuple, OrderedDict, Sequence, Mapping
)
from itertools import chain
from contextlib import contextmanager
import weakref
from delphin.exceptions import ItsdbError
from delphin.util import (
safe_int, stringtypes, deprecated, parse_datetime
)
from delphin.interfaces.base import FieldMapper
##############################################################################
# Module variables
_relations_filename = 'relations'
_field_delimiter = '@'
_default_datatype_values = {
':integer': '-1'
}
tsdb_coded_attributes = {
'i-wf': 1,
'i-difficulty': 1,
'polarity': -1
}
_primary_keys = [
["i-id", "item"],
["p-id", "phenomenon"],
["ip-id", "item-phenomenon"],
["s-id", "set"],
["run-id", "run"],
["parse-id", "parse"],
["e-id", "edge"],
["f-id", "fold"]
]
tsdb_core_files = [
"item",
"analysis",
"phenomenon",
"parameter",
"set",
"item-phenomenon",
"item-set"
]
_default_task_input_selectors = {
'parse': 'item:i-input',
'transfer': 'result:mrs',
'generate': 'result:mrs',
}
#############################################################################
# Relations files
[docs]class Field(
namedtuple('Field', 'name datatype key partial comment'.split())):
'''
A tuple describing a column in an [incr tsdb()] profile.
Args:
name (str): the column name
datatype (str): `":string"`, `":integer"`, `":date"`,
or `":float"`
key (bool): `True` if the column is a key in the database
partial (bool): `True` if the column is a partial key
comment (str): a description of the column
'''
def __new__(cls, name, datatype, key=False, partial=False, comment=None):
if partial and not key:
raise ItsdbError('a partial key must also be a key')
return super(Field, cls).__new__(
cls, name, datatype, key, partial, comment
)
def __str__(self):
parts = [self.name, self.datatype]
if self.key:
parts += [':key']
if self.partial:
parts += [':partial']
s = ' ' + ' '.join(parts)
if self.comment:
s = '{}# {}'.format(s.ljust(40), self.comment)
return s
[docs] def default_value(self):
"""Get the default value of the field."""
if self.name in tsdb_coded_attributes:
return tsdb_coded_attributes[self.name]
elif self.datatype == ':integer':
return -1
else:
return ''
[docs]class Relation(tuple):
"""
A [incr tsdb()] table schema.
Args:
name: the table name
fields: a list of Field objects
"""
def __new__(cls, name, fields):
tr = super(Relation, cls).__new__(cls, fields)
tr.name = name
tr._index = dict(
(f.name, i) for i, f in enumerate(fields)
)
tr._keys = None
tr.key_indices = tuple(i for i, f in enumerate(fields) if f.key)
return tr
def __contains__(self, name):
return name in self._index
[docs] def index(self, fieldname):
"""Return the Field index given by *fieldname*."""
return self._index[fieldname]
[docs] def keys(self):
"""Return the tuple of field names of key fields."""
keys = self._keys
if keys is None:
keys = tuple(self[i].name for i in self.key_indices)
return keys
class _RelationJoin(Relation):
def __new__(cls, rel1, rel2, on=None):
if set(rel1.name.split('+')).intersection(rel2.name.split('+')):
raise ItsdbError('Cannot join tables with the same name; '
'try renaming the table.')
name = '{}+{}'.format(rel1.name, rel2.name)
# the fields of the joined table, merging shared columns in *on*
if isinstance(on, stringtypes):
on = _split_cols(on)
elif on is None:
on = []
fields = _prefixed_relation_fields(rel1, on, False)
fields.extend(_prefixed_relation_fields(rel2, on, True))
r = super(_RelationJoin, cls).__new__(cls, name, fields)
# reset _keys to be a unique tuple of column-only forms
keys = list(rel1.keys())
seen = set(keys)
for key in rel2.keys():
if key not in seen:
keys.append(key)
seen.add(key)
r._keys = tuple(keys)
return r
def __contains__(self, name):
try:
self.index(name)
except KeyError:
return False
except ItsdbError:
pass # ambiguous field name
return True
def index(self, fieldname):
if ':' not in fieldname:
qfieldnames = []
for table in self.name.split('+'):
qfieldname = table + ':' + fieldname
if qfieldname in self._index:
qfieldnames.append(qfieldname)
if len(qfieldnames) > 1:
raise ItsdbError(
"ambiguous field name; include the table name "
"(e.g., 'item:i-id' instead of 'i-id')")
elif len(qfieldnames) == 1:
fieldname = qfieldnames[0]
else:
pass # lookup should return KeyError
elif fieldname not in self._index:
# join keys don't get prefixed
uqfieldname = fieldname.rpartition(':')[2]
if uqfieldname in self._keys:
fieldname = uqfieldname
return self._index[fieldname]
def _prefixed_relation_fields(fields, on, drop):
prefixed_fields = []
already_joined = isinstance(fields, _RelationJoin)
for f in fields:
table, _, fieldname = f[0].rpartition(':')
if already_joined :
prefix = table + ':' if table else ''
else:
prefix = fields.name + ':'
if fieldname in on and not drop:
prefixed_fields.append(Field(fieldname, *f[1:]))
elif fieldname not in on:
prefixed_fields.append(Field(prefix + fieldname, *f[1:]))
return prefixed_fields
[docs]class Relations(object):
"""
A [incr tsdb()] database schema.
Note:
Use :meth:`from_file` or :meth:`from_string` for instantiating
a Relations object.
Args:
tables: a list of (table, :class:`Relation`) tuples
"""
__slots__ = ('tables', '_data', '_field_map')
def __init__(self, tables):
tables = [(t[0], Relation(*t)) for t in tables]
self.tables = tuple(t[0] for t in tables)
self._data = dict(tables)
self._field_map = _make_field_map(t[1] for t in tables)
[docs] @classmethod
def from_file(cls, source):
"""Instantiate Relations from a relations file."""
if hasattr(source, 'read'):
relations = cls.from_string(source.read())
else:
with open(source) as f:
relations = cls.from_string(f.read())
return relations
[docs] @classmethod
def from_string(cls, s):
"""Instantiate Relations from a relations string."""
tables = []
seen = set()
current_table = None
lines = list(reversed(s.splitlines())) # to pop() in right order
while lines:
line = lines.pop().strip()
table_m = re.match(r'^(?P<table>\w.+):$', line)
field_m = re.match(r'\s*(?P<name>\S+)'
r'(\s+(?P<attrs>[^#]+))?'
r'(\s*#\s*(?P<comment>.*)$)?',
line)
if table_m is not None:
table_name = table_m.group('table')
if table_name in seen:
raise ItsdbError(
'Table {} already defined.'.format(table_name)
)
current_table = (table_name, [])
tables.append(current_table)
seen.add(table_name)
elif field_m is not None and current_table is not None:
name = field_m.group('name')
attrs = field_m.group('attrs').split()
datatype = attrs.pop(0)
key = ':key' in attrs
partial = ':partial' in attrs
comment = field_m.group('comment')
current_table[1].append(
Field(name, datatype, key, partial, comment)
)
elif line != '':
raise ItsdbError('Invalid line: ' + line)
return cls(tables)
def __contains__(self, key):
return key in self._data
def __getitem__(self, key):
return self._data[key]
def __iter__(self):
return iter(self.tables)
def __len__(self):
return len(self.tables)
def __str__(self):
return '\n\n'.join(
'{tablename}:\n{fields}'.format(
tablename=tablename,
fields='\n'.join(str(f) for f in self[tablename])
)
for tablename in self
)
[docs] def items(self):
"""Return a list of (table, :class:`Relation`) for each table."""
return [(table, self[table]) for table in self]
[docs] def find(self, fieldname):
"""
Return the list of tables that define the field *fieldname*.
"""
tablename, _, column = fieldname.rpartition(':')
if tablename and tablename in self._field_map[column]:
return tablename
else:
return self._field_map[fieldname]
[docs] def path(self, source, target):
"""
Find the path of id fields connecting two tables.
This is just a basic breadth-first-search. The relations file
should be small enough to not be a problem.
Returns:
list: (table, fieldname) pairs describing the path from
the source to target tables
Raises:
:class:`delphin.exceptions.ItsdbError`: when no path is
found
Example:
>>> relations.path('item', 'result')
[('parse', 'i-id'), ('result', 'parse-id')]
>>> relations.path('parse', 'item')
[('item', 'i-id')]
>>> relations.path('item', 'item')
[]
"""
visited = set(source.split('+')) # split on + for joins
targets = set(target.split('+')) - visited
# ensure sources and targets exists
for tablename in visited.union(targets):
self[tablename]
# base case; nothing to do
if len(targets) == 0:
return []
paths = [[(tablename, None)] for tablename in visited]
while True:
newpaths = []
for path in paths:
laststep, pivot = path[-1]
if laststep in targets:
return path[1:]
else:
for key in self[laststep].keys():
for step in set(self.find(key)) - visited:
visited.add(step)
newpaths.append(path + [(step, key)])
if newpaths:
paths = newpaths
else:
break
raise ItsdbError('no relation path found from {} to {}'
.format(source, target))
def _make_field_map(rels):
g = {}
for rel in rels:
for field in rel:
g.setdefault(field.name, []).append(rel.name)
return g
##############################################################################
# Test items and test suites
[docs]class Record(list):
"""
A row in a [incr tsdb()] table.
Args:
fields: the Relation schema for the table of this record
iterable: an iterable containing the data for the record
Attributes:
fields (:class:`Relation`): table schema
"""
__slots__ = ('fields', '_tableref', '_rowid')
def __init__(self, fields, iterable):
iterable = list(iterable)
if len(fields) != len(iterable):
raise ItsdbError(
'Incorrect number of column values for {} table: {} != {}\n{}'
.format(fields.name, len(iterable), len(fields), iterable)
)
iterable = [_cast_to_str(val, field)
for val, field in zip(iterable, fields)]
self.fields = fields
self._tableref = None
self._rowid = None
super(Record, self).__init__(iterable)
@classmethod
def _make(cls, fields, iterable, table, rowid):
"""
Create a Record bound to a :class:`Table`.
This is a helper method for creating Records from rows of a
Table that is attached to a file. It is not meant to be called
directly. It specifies the row number and a weak reference to
the Table object so that when the Record is modified it is
kept in the Table's in-memory list (see Record.__setitem__()),
otherwise the changes would not be retained the next time the
record is requested from the Table. The use of a weak
reference to the Table is to avoid a circular reference and
thus allow it to be properly garbage collected.
"""
record = cls(fields, iterable)
record._tableref = weakref.ref(table)
record._rowid = rowid
return record
[docs] @classmethod
def from_dict(cls, fields, mapping):
"""
Create a Record from a dictionary of field mappings.
The *fields* object is used to determine the column indices
of fields in the mapping.
Args:
fields: the Relation schema for the table of this record
mapping: a dictionary or other mapping from field names to
column values
Returns:
a :class:`Record` object
"""
iterable = [None] * len(fields)
for key, value in mapping.items():
try:
index = fields.index(key)
except KeyError:
raise ItsdbError('Invalid field name(s): ' + key)
iterable[index] = value
return cls(fields, iterable)
def __repr__(self):
return "<{} '{}' {}>".format(
self.__class__.__name__,
self.fields.name,
' '.join('{}={}'.format(k, self[k]) for k in self.fields.keys())
)
def __str__(self):
return make_row(self, self.fields)
def __eq__(self, other):
return all(a == b for a, b in zip(self, other))
def __ne__(self, other):
return any(a != b for a, b in zip(self, other))
def __iter__(self):
for raw, field in zip(list.__iter__(self), self.fields):
yield _cast_to_datatype(raw, field)
def __getitem__(self, index):
if not isinstance(index, int):
index = self.fields.index(index)
raw = list.__getitem__(self, index)
field = self.fields[index]
return _cast_to_datatype(raw, field)
def __setitem__(self, index, value):
if not isinstance(index, int):
index = self.fields.index(index)
# record values are strings
value = _cast_to_str(value, self.fields[index])
# should the value be validated against the datatype?
list.__setitem__(self, index, value)
# when a record is modified it should stay in memory
if self._tableref is not None:
assert self._rowid is not None
table = self._tableref()
if table is not None:
table[self._rowid] = self
[docs] def get(self, key, default=None, cast=True):
"""
Return the field data given by field name *key*.
Args:
key: the field name of the data to return
default: the value to return if *key* is not in the row
"""
tablename, _, key = key.rpartition(':')
if tablename and tablename not in self.fields.name.split('+'):
raise ItsdbError('column requested from wrong table: {}'
.format(tablename))
try:
index = self.fields.index(key)
value = list.__getitem__(self, index)
except (KeyError, IndexError):
value = default
else:
if cast:
field = self.fields[index]
value = _cast_to_datatype(value, field)
return value
[docs]class Table(object):
"""
A [incr tsdb()] table.
Instances of this class contain a collection of rows with the data
stored in the database. Generally a Table will be created by a
:class:`TestSuite` object for a database, but a Table can also be
instantiated individually by the :meth:`Table.from_file` class
method, and the relations file in the same directory is used to
get the schema. Tables can also be constructed entirely in-memory
and separate from a testsuite via the standard `Table()`
constructor.
Tables have two modes: **attached** and **detached**. Attached
tables are backed by a file on disk (whether as part of a
testsuite or not) and only store modified records in memory---all
unmodified records are retrieved from disk. Therefore, iterating
over a table is more efficient than random-access. Attached files
use significantly less memory than detached tables but also
require more processing time. Detached tables are entirely stored
in memory and are not backed by a file. They are useful for the
programmatic construction of testsuites (including for unit tests)
and other operations where high-speed random-access is required.
See the :meth:`attach` and :meth:`detach` methods for more
information. The :meth:`is_attached` method is useful for
determining the mode of a table.
Args:
fields: the Relation schema for this table
records: the collection of Record objects containing the table data
Attributes:
name (str): table name
fields (:class:`Relation`): table schema
path (str): if attached, the path to the file containing the
table data; if detached it is `None`
encoding (str): the character encoding of the attached table
file; if detached it is `None`
"""
__slots__ = ('fields', 'path', 'encoding', '_records',
'_last_synced_index', '__weakref__')
def __init__(self, fields, records=None):
self.fields = fields
self.path = None
self.encoding = None
self._records = []
self._last_synced_index = -1
if records is None:
records = []
self.extend(records)
[docs] @classmethod
def from_file(cls, path, fields=None, encoding='utf-8'):
"""
Instantiate a Table from a database file.
This method instantiates a table attached to the file at *path*.
The file will be opened and traversed to determine the number of
records, but the contents will not be stored in memory unless
they are modified.
Args:
path: the path to the table file
fields: the Relation schema for the table (loaded from the
relations file in the same directory if not given)
encoding: the character encoding of the file at *path*
"""
path = _table_filename(path) # do early in case file not found
if fields is None:
fields = _get_relation_from_table_path(path)
table = cls(fields)
table.attach(path, encoding=encoding)
return table
[docs] def write(self, records=None, path=None, fields=None, append=False,
gzip=None):
"""
Write the table to disk.
The basic usage has no arguments and writes the table's data
to the attached file. The parameters accommodate a variety of
use cases, such as using *fields* to refresh a table to a
new schema or *records* and *append* to incrementally build a
table.
Args:
records: an iterable of :class:`Record` objects to write;
if `None` the table's existing data is used
path: the destination file path; if `None` use the
path of the file attached to the table
fields (:class:`Relation`): table schema to use for
writing, otherwise use the current one
append: if `True`, append rather than overwrite
gzip: compress with gzip if non-empty
Examples:
>>> table.write()
>>> table.write(results, path='new/path/result')
"""
if path is None:
if not self.is_attached():
raise ItsdbError('no path given for detached table')
else:
path = self.path
path = _normalize_table_path(path)
dirpath, name = os.path.split(path)
if fields is None:
fields = self.fields
if records is None:
records = iter(self)
_write_table(
dirpath,
name,
records,
fields,
append=append,
gzip=gzip,
encoding=self.encoding)
if self.is_attached() and path == _normalize_table_path(self.path):
self.path = _table_filename(path)
self._sync_with_file()
[docs] def commit(self):
"""
Commit changes to disk if attached.
This method helps normalize the interface for detached and
attached tables and makes writing attached tables a bit more
efficient. For detached tables nothing is done, as there is no
notion of changes, but neither is an error raised (unlike with
:meth:`write`). For attached tables, if all changes are new
records, the changes are appended to the existing file, and
otherwise the whole file is rewritten.
"""
if not self.is_attached():
return
changes = self.list_changes()
if changes:
indices, records = zip(*changes)
if min(indices) > self._last_synced_index:
self.write(records, append=True)
else:
self.write(append=False)
[docs] def attach(self, path, encoding='utf-8'):
"""
Attach the Table to the file at *path*.
Attaching a table to a file means that only changed records
are stored in memory, which greatly reduces the memory
footprint of large profiles at some cost of
performance. Tables created from :meth:`Table.from_file()` or
from an attached :class:`TestSuite` are automatically
attached. Attaching a file does not immediately flush the
contents to disk; after attaching the table must be separately
written to commit the in-memory data.
A non-empty table will fail to attach to a non-empty file to
avoid data loss when merging the contents. In this case, you
may delete or clear the file, clear the table, or attach to
another file.
Args:
path: the path to the table file
encoding: the character encoding of the files in the testsuite
"""
if self.is_attached():
raise ItsdbError('already attached at {}'.format(self.path))
try:
path = _table_filename(path)
except ItsdbError:
# neither path nor path.gz exist; create new empty file
# (note: if the file were non-empty this would be destructive)
path = _normalize_table_path(path)
open(path, 'w').close()
else:
# path or path.gz exists; check if merging would be a problem
if os.stat(path).st_size > 0 and len(self._records) > 0:
raise ItsdbError(
'cannot attach non-empty table to non-empty file')
self.path = path
self.encoding = encoding
# if _records is not empty then we're attaching to an empty file
if len(self._records) == 0:
self._sync_with_file()
[docs] def detach(self):
"""
Detach the table from a file.
Detaching a table reads all data from the file and places it
in memory. This is useful when constructing or significantly
manipulating table data, or when more speed is needed. Tables
created by the default constructor are detached.
When detaching, only unmodified records are loaded from the
file; any uncommited changes in the Table are left as-is.
.. warning::
Very large tables may consume all available RAM when
detached. Expect the in-memory table to take up about
twice the space of an uncompressed table on disk, although
this may vary by system.
"""
if not self.is_attached():
raise ItsdbError('already detached')
records = self._records
for i, line in self._enum_lines():
if records[i] is None:
# check number of columns?
records[i] = tuple(decode_row(line))
self.path = None
self.encoding = None
@property
def name(self):
return self.fields.name
[docs] def is_attached(self):
"""Return `True` if the table is attached to a file."""
return self.path is not None
[docs] def list_changes(self):
"""
Return a list of modified records.
This is only applicable for attached tables.
Returns:
A list of `(row_index, record)` tuples of modified records
Raises:
:class:`delphin.exceptions.ItsdbError`: when called on a
detached table
"""
if not self.is_attached():
raise ItsdbError('changes are not tracked for detached tables.')
return [(i, self[i]) for i, row in enumerate(self._records)
if row is not None]
def _sync_with_file(self):
"""Clear in-memory structures so table is synced with the file."""
self._records = []
i = -1
for i, line in self._enum_lines():
self._records.append(None)
self._last_synced_index = i
def _enum_lines(self):
"""Enumerate lines from the attached file."""
with _open_table(self.path, self.encoding) as lines:
for i, line in enumerate(lines):
yield i, line
def _enum_attached_rows(self, indices):
"""Enumerate on-disk and in-memory records."""
records = self._records
i = 0
# first rows covered by the file
for i, line in self._enum_lines():
if i in indices:
row = records[i]
if row is None:
row = decode_row(line)
yield (i, row)
# then any uncommitted rows
for j in range(i, len(records)):
if j in indices:
if records[j] is not None:
yield (j, records[j])
def __iter__(self):
for record in self._iterslice(slice(None)):
yield record
def __getitem__(self, index):
if isinstance(index, slice):
return list(self._iterslice(index))
else:
return self._getitem(index)
def _iterslice(self, slice):
"""Yield records from a slice index."""
indices = range(*slice.indices(len(self._records)))
if self.is_attached():
rows = self._enum_attached_rows(indices)
if slice.step is not None and slice.step < 0:
rows = reversed(list(rows))
else:
rows = zip(indices, self._records[slice])
fields = self.fields
for i, row in rows:
yield Record._make(fields, row, self, i)
def _getitem(self, index):
"""Get a single non-slice index."""
row = self._records[index]
if row is not None:
pass
elif self.is_attached():
# need to handle negative indices manually
if index < 0:
index = len(self._records) + index
row = next((decode_row(line)
for i, line in self._enum_lines()
if i == index),
None)
if row is None:
raise ItsdbError('could not retrieve row in attached table')
else:
raise ItsdbError('invalid row in detached table: {}'.format(index))
return Record._make(self.fields, row, self, index)
def __setitem__(self, index, value):
# first normalize the arguments for slices and regular indices
if isinstance(index, slice):
values = list(value)
else:
self._records[index] # check for IndexError
values = [value]
index = slice(index, index + 1)
# now prepare the records for being in a table
fields = self.fields
for i, record in enumerate(values):
values[i] = _cast_record_to_str_tuple(record, fields)
self._records[index] = values
def __len__(self):
return len(self._records)
[docs] def append(self, record):
"""
Add *record* to the end of the table.
Args:
record: a :class:`Record` or other iterable containing
column values
"""
self.extend([record])
[docs] def extend(self, records):
"""
Add each record in *records* to the end of the table.
Args:
record: an iterable of :class:`Record` or other iterables
containing column values
"""
fields = self.fields
for record in records:
record = _cast_record_to_str_tuple(record, fields)
self._records.append(record)
[docs] def select(self, cols, mode='list'):
"""
Select columns from each row in the table.
See :func:`select_rows` for a description of how to use the
*mode* parameter.
Args:
cols: an iterable of Field (column) names
mode: how to return the data
"""
if isinstance(cols, stringtypes):
cols = _split_cols(cols)
if not cols:
cols = [f.name for f in self.fields]
return select_rows(cols, self, mode=mode)
def _normalize_table_path(path):
if path[-3:].lower() == '.gz':
path = path[:-3]
return path
def _get_relation_from_table_path(path):
rpath = os.path.join(os.path.dirname(path), _relations_filename)
if not os.path.exists(rpath):
raise ItsdbError(
'No relation is specified and a relations file could '
'not be found.'
)
rels = Relations.from_file(rpath)
name = os.path.basename(_normalize_table_path(path))
if name not in rels:
raise ItsdbError(
'Table \'{}\' not found in the relations.'.format(name)
)
# successfully inferred the relations for the table
return rels[name]
def _cast_record_to_str_tuple(record, fields):
if len(record) != len(fields):
raise ItsdbError('wrong number of fields')
return tuple(_cast_to_str(value, field)
for value, field in zip(record, fields))
[docs]class TestSuite(object):
"""
A [incr tsdb()] testsuite database.
Args:
path: the path to the testsuite's directory
relations (:class:`Relations`, str): the database schema; either
a :class:`Relations` object or a path to a relations file;
if not given, the relations file under *path* will be used
encoding: the character encoding of the files in the testsuite
Attributes:
encoding (:py:class:`str`): character encoding used when reading and
writing tables
relations (:class:`Relations`): database schema
"""
__slots__ = ('_path', 'relations', '_data', 'encoding')
def __init__(self, path=None, relations=None, encoding='utf-8'):
self._path = path
self.encoding = encoding
if isinstance(relations, Relations):
self.relations = relations
elif relations is None and path is not None:
relations = os.path.join(self._path, _relations_filename)
self.relations = Relations.from_file(relations)
elif relations is not None and os.path.isfile(relations):
self.relations = Relations.from_file(relations)
else:
raise ItsdbError(
'Either the relations parameter must be provided or '
'*path* must point to a directory with a relations file.'
)
self._data = dict((t, None) for t in self.relations)
if self._path is not None:
self.reload()
def __getitem__(self, tablename):
# if the table is None it is invalidated; reload it
if self._data[tablename] is None:
if self._path is not None:
self._reload_table(tablename)
else:
self._data[tablename] = Table(
self.relations[tablename]
)
return self._data[tablename]
[docs] def reload(self):
"""Discard temporary changes and reload the database from disk."""
if self._path is None:
raise ItsdbError('cannot reload an in-memory testsuite')
for tablename in self.relations:
self._reload_table(tablename)
def _reload_table(self, tablename):
# assumes self.path is not None
fields = self.relations[tablename]
path = os.path.join(self._path, tablename)
try:
path = _table_filename(path)
except ItsdbError:
# path doesn't exist
path = _normalize_table_path(path)
open(path, 'w').close() # create empty file
table = Table.from_file(path,
fields=fields,
encoding=self.encoding)
self._data[tablename] = table
[docs] def select(self, arg, cols=None, mode='list'):
"""
Select columns from each row in the table.
The first parameter, *arg*, may either be a table name or a
data specifier. If the former, the *cols* parameter selects the
columns from the table. If the latter, *cols* is left
unspecified and both the table and columns are taken from the
data specifier; e.g., `select('item:i-id@i-input')` is
equivalent to `select('item', ('i-id', 'i-input'))`.
See select_rows() for a description of how to use the *mode*
parameter.
Args:
arg: a table name, if *cols* is specified, otherwise a data
specifier
cols: an iterable of Field (column) names
mode: how to return the data
"""
if cols is None:
table, cols = get_data_specifier(arg)
else:
table = arg
if cols is None:
cols = [f.name for f in self.relations[table]]
return select_rows(cols, self[table], mode=mode)
[docs] def write(self, tables=None, path=None, relations=None,
append=False, gzip=None):
"""
Write the testsuite to disk.
Args:
tables: a name or iterable of names of tables to write,
or a Mapping of table names to table data; if `None`,
all tables will be written
path: the destination directory; if `None` use the path
assigned to the TestSuite
relations: a :class:`Relations` object or path to a
relations file to be used when writing the tables
append: if `True`, append to rather than overwrite tables
gzip: compress non-empty tables with gzip
Examples:
>>> ts.write(path='new/path')
>>> ts.write('item')
>>> ts.write(['item', 'parse', 'result'])
>>> ts.write({'item': item_rows})
"""
if path is None:
path = self._path
if tables is None:
tables = self._data
elif isinstance(tables, stringtypes):
tables = {tables: self[tables]}
elif isinstance(tables, Mapping):
pass
elif isinstance(tables, (Sequence, set)):
tables = dict((table, self[table]) for table in tables)
if relations is None:
relations = self.relations
elif isinstance(relations, stringtypes):
relations = Relations.from_file(relations)
# prepare destination
if not os.path.exists(path):
os.makedirs(path)
# raise error if path != self._path?
with open(os.path.join(path, _relations_filename), 'w') as fh:
print(str(relations), file=fh)
for tablename, fields in relations.items():
if tablename in tables:
data = tables[tablename]
# reload table from disk if it is invalidated
if data is None:
data = self[tablename]
elif not isinstance(data, Table):
data = Table(fields, data)
_write_table(
path,
tablename,
data,
fields,
append=append,
gzip=gzip,
encoding=self.encoding
)
[docs] def exists(self, table=None):
"""
Return `True` if the testsuite or a table exists on disk.
If *table* is `None`, this method returns `True` if the
:attr:`TestSuite.path` is specified and points to an existing
directory containing a valid relations file. If *table* is
given, the function returns `True` if, in addition to the
above conditions, the table exists as a file (even if
empty). Otherwise it returns False.
"""
if self._path is None or not os.path.isdir(self._path):
return False
if not os.path.isfile(os.path.join(self._path, _relations_filename)):
return False
if table is not None:
try:
_table_filename(os.path.join(self._path, table))
except ItsdbError:
return False
return True
[docs] def size(self, table=None):
"""
Return the size, in bytes, of the testsuite or *table*.
If *table* is `None`, return the size of the whole testsuite
(i.e., the sum of the table sizes). Otherwise, return the
size of *table*.
Notes:
* If the file is gzipped, it returns the compressed size.
* Only tables on disk are included.
"""
size = 0
if table is None:
for table in self.relations:
size += self.size(table)
else:
try:
fn = _table_filename(os.path.join(self._path, table))
size += os.stat(fn).st_size
except ItsdbError:
pass
return size
[docs] def process(self, cpu, selector=None, source=None, fieldmapper=None,
gzip=None, buffer_size=1000):
"""
Process each item in a [incr tsdb()] testsuite
If the testsuite is attached to files on disk, the output
records will be flushed to disk when the number of new records
in a table is *buffer_size*. If the testsuite is not attached
to files or *buffer_size* is set to `None`, records are kept
in memory and not flushed to disk.
Args:
cpu (:class:`~delphin.interfaces.base.Processor`):
processor interface (e.g.,
:class:`~delphin.interfaces.ace.AceParser`)
selector (str): data specifier to select a single table and
column as processor input (e.g., `"item:i-input"`)
source (:class:`TestSuite`, :class:`Table`): testsuite or
table from which inputs are taken; if `None`, use `self`
fieldmapper (:class:`~delphin.interfaces.base.FieldMapper`):
object for mapping response fields to [incr tsdb()]
fields; if `None`, use a default mapper for the
standard schema
gzip: compress non-empty tables with gzip
buffer_size (int): number of output records to hold in
memory before flushing to disk; ignored if the testsuite
is all in-memory; if `None`, do not flush to disk
Examples:
>>> ts.process(ace_parser)
>>> ts.process(ace_generator, 'result:mrs', source=ts2)
"""
if selector is None:
selector = _default_task_input_selectors.get(cpu.task)
if source is None:
source = self
if fieldmapper is None:
fieldmapper = FieldMapper()
if self._path is None:
buffer_size = None
tables = set(fieldmapper.affected_tables).intersection(self.relations)
_prepare_target(self, tables, buffer_size)
source, cols = _prepare_source(selector, source)
key_cols = cols[:-1]
for item in select_rows(cols, source, mode='list'):
datum = item.pop()
keys = dict(zip(key_cols, item))
response = cpu.process_item(datum, keys=keys)
logging.info(
'Processed item {:>16} {:>8} results'
.format(encode_row(item), len(response['results']))
)
for tablename, data in fieldmapper.map(response):
_add_record(self[tablename], data, buffer_size)
for tablename, data in fieldmapper.cleanup():
_add_record(self[tablename], data, buffer_size)
# finalize data if writing to disk
for tablename in tables:
table = self[tablename]
if buffer_size is not None:
table.write(gzip=gzip)
def _prepare_target(ts, tables, buffer_size):
"""Clear tables affected by the processing."""
for tablename in tables:
table = ts[tablename]
table[:] = []
if buffer_size is not None and table.is_attached():
table.write(append=False)
def _prepare_source(selector, source):
"""Normalize source rows and selectors."""
tablename, fields = get_data_specifier(selector)
if len(fields) != 1:
raise ItsdbError(
'Selector must specify exactly one data column: {}'
.format(selector)
)
if isinstance(source, TestSuite):
if not tablename:
tablename = source.relations.find(fields[0])[0]
source = source[tablename]
cols = list(source.fields.keys()) + fields
return source, cols
def _add_record(table, data, buffer_size):
"""
Prepare and append a Record into its Table; flush to disk if necessary.
"""
fields = table.fields
# remove any keys that aren't relation fields
for invalid_key in set(data).difference([f.name for f in fields]):
del data[invalid_key]
table.append(Record.from_dict(fields, data))
# write if requested and possible
if buffer_size is not None and table.is_attached():
# for now there isn't a public method to get the number of new
# records, so use private members
if (len(table) - 1) - table._last_synced_index > buffer_size:
table.commit()
##############################################################################
# Non-class (i.e. static) functions
data_specifier_re = re.compile(r'(?P<table>[^:]+)?(:(?P<cols>.+))?$')
[docs]def get_data_specifier(string):
"""
Return a tuple (table, col) for some [incr tsdb()] data specifier.
For example::
item -> ('item', None)
item:i-input -> ('item', ['i-input'])
item:i-input@i-wf -> ('item', ['i-input', 'i-wf'])
:i-input -> (None, ['i-input'])
(otherwise) -> (None, None)
"""
match = data_specifier_re.match(string)
if match is None:
return (None, None)
table = match.group('table')
if table is not None:
table = table.strip()
cols = _split_cols(match.group('cols'))
return (table, cols)
def _split_cols(colstring):
if not colstring:
return None
colstring = colstring.lstrip(':')
return [col.strip() for col in colstring.split('@')]
[docs]def decode_row(line, fields=None):
"""
Decode a raw line from a profile into a list of column values.
Decoding involves splitting the line by the field delimiter
(`"@"` by default) and unescaping special characters. If *fields*
is given, cast the values into the datatype given by their
respective Field object.
Args:
line: a raw line from a [incr tsdb()] profile.
fields: a list or Relation object of Fields for the row
Returns:
A list of column values.
"""
cols = line.rstrip('\n').split(_field_delimiter)
cols = list(map(unescape, cols))
if fields is not None:
if len(cols) != len(fields):
raise ItsdbError(
'Wrong number of fields: {} != {}'
.format(len(cols), len(fields))
)
for i in range(len(cols)):
col = cols[i]
if col:
field = fields[i]
col = _cast_to_datatype(col, field)
cols[i] = col
return cols
def _cast_to_datatype(col, field):
if col is None:
col = field.default_value()
else:
dt = field.datatype
if dt == ':integer':
col = int(col)
elif dt == ':float':
col = float(col)
elif dt == ':date':
dt = parse_datetime(col)
col = dt if dt is not None else col
# other casts? :position?
return col
def _cast_to_str(col, field):
if col is None:
if field.key:
raise ItsdbError('missing key: {}'.format(field.name))
col = field.default_value()
return unicode(col)
[docs]def encode_row(fields):
"""
Encode a list of column values into a [incr tsdb()] profile line.
Encoding involves escaping special characters for each value, then
joining the values into a single string with the field delimiter
(`"@"` by default). It does not fill in default values (see
make_row()).
Args:
fields: a list of column values
Returns:
A [incr tsdb()]-encoded string
"""
# NOTE: str(f) only works for Python3
unicode_fields = [unicode(f) for f in fields]
escaped_fields = map(escape, unicode_fields)
return _field_delimiter.join(escaped_fields)
[docs]def escape(string):
r"""
Replace any special characters with their [incr tsdb()] escape
sequences. The characters and their escape sequences are::
@ -> \s
(newline) -> \n
\ -> \\
Also see :func:`unescape`
Args:
string: the string to escape
Returns:
The escaped string
"""
# str.replace()... is about 3-4x faster than re.sub() here
return (string
.replace('\\', '\\\\') # must be done first
.replace('\n', '\\n')
.replace(_field_delimiter, '\\s'))
[docs]def unescape(string):
"""
Replace [incr tsdb()] escape sequences with the regular equivalents.
Also see :func:`escape`.
Args:
string (str): the escaped string
Returns:
The string with escape sequences replaced
"""
# str.replace()... is about 3-4x faster than re.sub() here
return (string
.replace('\\\\','\\') # must be done first
.replace('\\n','\n')
.replace('\\s', _field_delimiter))
def _table_filename(tbl_filename):
"""
Determine if the table path should end in .gz or not and return it.
A .gz path is preferred only if it exists and is newer than any
regular text file path.
Raises:
:class:`delphin.exceptions.ItsdbError`: when neither the .gz
nor text file exist.
"""
tbl_filename = str(tbl_filename) # convert any Path objects
txfn = _normalize_table_path(tbl_filename)
gzfn = txfn + '.gz'
if os.path.exists(txfn):
if (os.path.exists(gzfn) and
os.stat(gzfn).st_mtime > os.stat(txfn).st_mtime):
tbl_filename = gzfn
else:
tbl_filename = txfn
elif os.path.exists(gzfn):
tbl_filename = gzfn
else:
raise ItsdbError(
'Table does not exist at {}(.gz)'
.format(tbl_filename)
)
return tbl_filename
@contextmanager
def _open_table(tbl_filename, encoding):
"""
Transparently open the compressed or text table file.
Can be used as a context manager in a 'with' statement.
"""
path = _table_filename(tbl_filename)
if path.endswith('.gz'):
# gzip.open() cannot use mode='rt' until Python2.7 support
# is gone; until then use TextIOWrapper
gzfile = GzipFile(path, mode='r')
gzfile.read1 = gzfile.read # Python2 hack
with TextIOWrapper(gzfile, encoding=encoding) as f:
yield f
else:
with io.open(path, encoding=encoding) as f:
yield f
def _write_table(profile_dir, table_name, rows, fields,
append=False, gzip=False, encoding='utf-8'):
# don't gzip if empty
rows = iter(rows)
try:
first_row = next(rows)
except StopIteration:
gzip = False
else:
rows = chain([first_row], rows)
if encoding is None:
encoding = 'utf-8'
if gzip and append:
logging.warning('Appending to a gzip file may result in '
'inefficient compression.')
if not os.path.isdir(profile_dir):
raise ItsdbError('Profile directory does not exist: {}'
.format(profile_dir))
with tempfile.NamedTemporaryFile(
mode='w+b', suffix='.tmp',
prefix=table_name, dir=profile_dir) as f_tmp:
for row in rows:
f_tmp.write((make_row(row, fields) + '\n').encode(encoding))
f_tmp.seek(0)
txfn = os.path.join(profile_dir, table_name)
gzfn = txfn + '.gz'
mode = 'ab' if append else 'wb'
if gzip:
# clean up non-gzip files, if any
if os.path.isfile(txfn):
os.remove(txfn)
with gzopen(gzfn, mode) as f_out:
shutil.copyfileobj(f_tmp, f_out)
else:
# clean up gzip files, if any
if os.path.isfile(gzfn):
os.remove(gzfn)
with open(txfn, mode=mode) as f_out:
shutil.copyfileobj(f_tmp, f_out)
[docs]def make_row(row, fields):
"""
Encode a mapping of column name to values into a [incr tsdb()]
profile line. The *fields* parameter determines what columns are
used, and default values are provided if a column is missing from
the mapping.
Args:
row: a mapping of column names to values
fields: an iterable of :class:`Field` objects
Returns:
A [incr tsdb()]-encoded string
"""
if not hasattr(row, 'get'):
row = {f.name: col for f, col in zip(fields, row)}
row_fields = []
for f in fields:
val = row.get(f.name, None)
if val is None:
val = str(f.default_value())
row_fields.append(val)
return encode_row(row_fields)
[docs]def select_rows(cols, rows, mode='list', cast=True):
"""
Yield data selected from rows.
It is sometimes useful to select a subset of data from a profile.
This function selects the data in *cols* from *rows* and yields it
in a form specified by *mode*. Possible values of *mode* are:
================== ================= ==========================
mode description example `['i-id', 'i-wf']`
================== ================= ==========================
`'list'` (default) a list of values `[10, 1]`
`'dict'` col to value map `{'i-id': 10,'i-wf': 1}`
`'row'` [incr tsdb()] row `'10@1'`
================== ================= ==========================
Args:
cols: an iterable of column names to select data for
rows: the rows to select column data from
mode: the form yielded data should take
cast: if `True`, cast column values to their datatype
(requires *rows* to be :class:`Record` objects)
Yields:
Selected data in the form specified by *mode*.
"""
mode = mode.lower()
if mode == 'list':
modecast = lambda cols, data: data
elif mode == 'dict':
modecast = lambda cols, data: dict(zip(cols, data))
elif mode == 'row':
modecast = lambda cols, data: encode_row(data)
else:
raise ItsdbError('Invalid mode for select operation: {}\n'
' Valid options include: list, dict, row'
.format(mode))
for row in rows:
try:
data = [row.get(c, cast=cast) for c in cols]
except TypeError:
data = [row.get(c) for c in cols]
yield modecast(cols, data)
[docs]def match_rows(rows1, rows2, key, sort_keys=True):
"""
Yield triples of `(value, left_rows, right_rows)` where
`left_rows` and `right_rows` are lists of rows that share the same
column value for *key*. This means that both *rows1* and *rows2*
must have a column with the same name *key*.
.. warning::
Both *rows1* and *rows2* will exist in memory for this
operation, so it is not recommended for very large tables on
low-memory systems.
Args:
rows1: a :class:`Table` or list of :class:`Record` objects
rows2: a :class:`Table` or list of :class:`Record` objects
key (str): the column name on which to match
sort_keys (bool): if `True`, yield matching rows sorted by the
matched key instead of the original order
"""
matched = OrderedDict()
for i, rows in enumerate([rows1, rows2]):
for row in rows:
val = row[key]
try:
data = matched[val]
except KeyError:
matched[val] = ([], [])
data = matched[val]
data[i].append(row)
vals = matched.keys()
if sort_keys:
vals = sorted(vals, key=safe_int)
for val in vals:
left, right = matched[val]
yield (val, left, right)
[docs]def join(table1, table2, on=None, how='inner', name=None):
"""
Join two tables and return the resulting Table object.
Fields in the resulting table have their names prefixed with their
corresponding table name. For example, when joining `item` and
`parse` tables, the `i-input` field of the `item` table will be
named `item:i-input` in the resulting Table. Pivot fields (those
in *on*) are only stored once without the prefix.
Both inner and left joins are possible by setting the *how*
parameter to `inner` and `left`, respectively.
.. warning::
Both *table2* and the resulting joined table will exist in
memory for this operation, so it is not recommended for very
large tables on low-memory systems.
Args:
table1 (:class:`Table`): the left table to join
table2 (:class:`Table`): the right table to join
on (str): the shared key to use for joining; if `None`, find
shared keys using the schemata of the tables
how (str): the method used for joining (`"inner"` or `"left"`)
name (str): the name assigned to the resulting table
"""
if how not in ('inner', 'left'):
ItsdbError('Only \'inner\' and \'left\' join methods are allowed.')
# validate and normalize the pivot
on = _join_pivot(on, table1, table2)
# the fields of the joined table
fields = _RelationJoin(table1.fields, table2.fields, on=on)
# get key mappings to the right side (useful for inner and left joins)
get_key = lambda rec: tuple(rec.get(k) for k in on)
key_indices = set(table2.fields.index(k) for k in on)
right = defaultdict(list)
for rec in table2:
right[get_key(rec)].append([c for i, c in enumerate(rec)
if i not in key_indices])
# build joined table
rfill = [f.default_value() for f in table2.fields if f.name not in on]
joined = []
for lrec in table1:
k = get_key(lrec)
if how == 'left' or k in right:
joined.extend(lrec + rrec for rrec in right.get(k, [rfill]))
return Table(fields, joined)
def _join_pivot(on, table1, table2):
if isinstance(on, stringtypes):
on = _split_cols(on)
if not on:
on = set(table1.fields.keys()).intersection(table2.fields.keys())
if not on:
raise ItsdbError(
'No shared key to join on in the \'{}\' and \'{}\' tables.'
.format(table1.name, table2.name)
)
return sorted(on)
##############################################################################
# Deprecated
[docs]@deprecated(final_version='1.0.0', alternative='Relations.from_file(path)')
def get_relations(path):
"""
Parse the relations file and return a Relations object that
describes the database structure.
**Note**: for backward-compatibility only; use Relations.from_file()
Args:
path: The path of the relations file.
Returns:
A dictionary mapping a table name to a list of Field tuples.
.. deprecated:: v0.7.0
"""
return Relations.from_file(path)
[docs]@deprecated(final_version='1.0.0', alternative='Field.default_value()')
def default_value(fieldname, datatype):
"""
Return the default value for a column.
If the column name (e.g. *i-wf*) is defined to have an idiosyncratic
value, that value is returned. Otherwise the default value for the
column's datatype is returned.
Args:
fieldname: the column name (e.g. `i-wf`)
datatype: the datatype of the column (e.g. `:integer`)
Returns:
The default value for the column.
.. deprecated:: v0.7.0
"""
if fieldname in tsdb_coded_attributes:
return str(tsdb_coded_attributes[fieldname])
else:
return _default_datatype_values.get(datatype, '')
[docs]@deprecated(final_version='1.0.0')
def make_skeleton(path, relations, item_rows, gzip=False):
"""
Instantiate a new profile skeleton (only the relations file and
item file) from an existing relations file and a list of rows
for the item table. For standard relations files, it is suggested
to have, as a minimum, the `i-id` and `i-input` fields in the
item rows.
Args:
path: the destination directory of the skeleton---must not
already exist, as it will be created
relations: the path to the relations file
item_rows: the rows to use for the item file
gzip: if `True`, the item file will be compressed
Returns:
An ItsdbProfile containing the skeleton data (but the profile
data will already have been written to disk).
Raises:
:class:`delphin.exceptions.ItsdbError`: if the destination
directory could not be created.
.. deprecated:: v0.7.0
"""
try:
os.makedirs(path)
except OSError:
raise ItsdbError('Path already exists: {}.'.format(path))
import shutil
shutil.copyfile(relations, os.path.join(path, _relations_filename))
prof = ItsdbProfile(path, index=False)
prof.write_table('item', item_rows, gzip=gzip)
return prof
[docs]@deprecated(final_version='1.0.0')
def filter_rows(filters, rows):
"""
Yield rows matching all applicable filters.
Filter functions have binary arity (e.g. `filter(row, col)`) where
the first parameter is the dictionary of row data, and the second
parameter is the data at one particular column.
Args:
filters: a tuple of (cols, filter_func) where filter_func will
be tested (filter_func(row, col)) for each col in cols where
col exists in the row
rows: an iterable of rows to filter
Yields:
Rows matching all applicable filters
.. deprecated:: v0.7.0
"""
for row in rows:
if all(condition(row, row.get(col))
for (cols, condition) in filters
for col in cols
if col is None or col in row):
yield row
[docs]@deprecated(final_version='1.0.0')
def apply_rows(applicators, rows):
"""
Yield rows after applying the applicator functions to them.
Applicators are simple unary functions that return a value, and that
value is stored in the yielded row. E.g.
`row[col] = applicator(row[col])`. These are useful to, e.g., cast
strings to numeric datatypes, to convert formats stored in a cell,
extract features for machine learning, and so on.
Args:
applicators: a tuple of (cols, applicator) where the applicator
will be applied to each col in cols
rows: an iterable of rows for applicators to be called on
Yields:
Rows with specified column values replaced with the results of
the applicators
.. deprecated:: v0.7.0
"""
for row in rows:
for (cols, function) in applicators:
for col in (cols or []):
value = row.get(col, '')
row[col] = function(row, value)
yield row
[docs]class ItsdbProfile(object):
"""
A [incr tsdb()] profile, analyzed and ready for reading or writing.
Args:
path: The path of the directory containing the profile
filters: A list of tuples [(table, cols, condition)] such
that only rows in table where condition(row, row[col])
evaluates to a non-false value are returned; filters are
tested in order for a table.
applicators: A list of tuples [(table, cols, function)]
which will be used when reading rows from a table---the
function will be applied to the contents of the column
cell in the table. For each table, each column-function
pair will be applied in order. Applicators apply after
the filters.
index: If `True`, indices are created based on the keys of
each table.
cast: if `True`, automatically cast data into the type defined
by its relation field (e.g., :integer)
.. deprecated:: v0.7.0
"""
# _tables is a list of table names to consider (for indexing, writing,
# etc.). If `None`, all present in the relations file and on disk are
# considered. Otherwise, only those present in the list are considered.
_tables = None
@deprecated("The 'ItsdbProfile' class is deprecated "
"and will be removed from version {version}; "
"use the following instead: {alternative}",
final_version='1.0.0',
alternative='TestSuite')
def __init__(self, path, relations=None,
filters=None, applicators=None, index=True,
cast=False, encoding='utf-8'):
self.root = path
self.cast = cast
self.encoding = encoding
# somewhat backwards-compatible resolution of relations file
if isinstance(relations, dict):
self.relations = relations
else:
if relations is None:
relations = os.path.join(self.root, _relations_filename)
self.relations = Relations.from_file(relations)
if self._tables is None:
self._tables = list(self.relations)
self.filters = defaultdict(list)
self.applicators = defaultdict(list)
self._index = dict()
for (table, cols, condition) in (filters or []):
self.add_filter(table, cols, condition)
for (table, cols, function) in (applicators or []):
self.add_applicator(table, cols, function)
if index:
self._build_index()
[docs] def add_filter(self, table, cols, condition):
"""
Add a filter. When reading *table*, rows in *table* will be
filtered by filter_rows().
Args:
table: The table the filter applies to.
cols: The columns in *table* to filter on.
condition: The filter function.
"""
if table is not None and table not in self.relations:
raise ItsdbError('Cannot add filter; table "{}" is not defined '
'by the relations file.'
.format(table))
# this is a hack, though perhaps well-motivated
if cols is None:
cols = [None]
self.filters[table].append((cols, condition))
[docs] def add_applicator(self, table, cols, function):
"""
Add an applicator. When reading *table*, rows in *table* will be
modified by apply_rows().
Args:
table: The table to apply the function to.
cols: The columns in *table* to apply the function on.
function: The applicator function.
"""
if table not in self.relations:
raise ItsdbError('Cannot add applicator; table "{}" is not '
'defined by the relations file.'
.format(table))
if cols is None:
raise ItsdbError('Cannot add applicator; columns not specified.')
fields = set(f.name for f in self.relations[table])
for col in cols:
if col not in fields:
raise ItsdbError('Cannot add applicator; column "{}" not '
'defined by the relations file.'
.format(col))
self.applicators[table].append((cols, function))
def _build_index(self):
self._index = {key: None for key, _ in _primary_keys}
tables = self._tables
if tables is not None:
tables = set(tables)
for (keyname, table) in _primary_keys:
if table in tables:
ids = set()
try:
for row in self.read_table(table):
key = row[keyname]
ids.add(key)
except ItsdbError:
logging.info('Failed to index {}.'.format(table))
self._index[keyname] = ids
def table_relations(self, table):
if table not in self.relations:
raise ItsdbError(
'Table {} is not defined in the profiles relations.'
.format(table)
)
return self.relations[table]
[docs] def read_raw_table(self, table):
"""
Yield rows in the [incr tsdb()] *table*. A row is a dictionary
mapping column names to values. Data from a profile is decoded
by decode_row(). No filters or applicators are used.
"""
fields = self.table_relations(table) if self.cast else None
field_names = [f.name for f in self.table_relations(table)]
field_len = len(field_names)
table_path = os.path.join(self.root, table)
with _open_table(table_path, self.encoding) as tbl:
for line in tbl:
cols = decode_row(line, fields=fields)
if len(cols) != field_len:
# should this throw an exception instead?
logging.error('Number of stored fields ({}) '
'differ from the expected number({}); '
'fields may be misaligned!'
.format(len(cols), field_len))
row = OrderedDict(zip(field_names, cols))
yield row
[docs] def read_table(self, table, key_filter=True):
"""
Yield rows in the [incr tsdb()] *table* that pass any defined
filters, and with values changed by any applicators. If no
filters or applicators are defined, the result is the same as
from ItsdbProfile.read_raw_table().
"""
filters = self.filters[None] + self.filters[table]
if key_filter:
for f in self.relations[table]:
key = f.name
if f.key and (self._index.get(key) is not None):
ids = self._index[key]
# Can't keep local variables (like ids) in the scope of
# the lambda expression, so make it a default argument.
# Source: http://stackoverflow.com/a/938493/1441112
function = lambda r, x, ids=ids: x in ids
filters.append(([key], function))
applicators = self.applicators[table]
rows = self.read_raw_table(table)
return filter_rows(filters, apply_rows(applicators, rows))
[docs] def select(self, table, cols, mode='list', key_filter=True):
"""
Yield selected rows from *table*. This method just calls
select_rows() on the rows read from *table*.
"""
if cols is None:
cols = [c.name for c in self.relations[table]]
rows = self.read_table(table, key_filter=key_filter)
for row in select_rows(cols, rows, mode=mode):
yield row
[docs] def join(self, table1, table2, key_filter=True):
"""
Yield rows from a table built by joining *table1* and *table2*.
The column names in the rows have the original table name
prepended and separated by a colon. For example, joining tables
'item' and 'parse' will result in column names like
'item:i-input' and 'parse:parse-id'.
"""
get_keys = lambda t: (f.name for f in self.relations[t] if f.key)
keys = set(get_keys(table1)).intersection(get_keys(table2))
if not keys:
raise ItsdbError(
'Cannot join tables "{}" and "{}"; no shared key exists.'
.format(table1, table2)
)
key = keys.pop()
# this join method stores the whole of table2 in memory, but it is
# MUCH faster than a nested loop method. Most profiles will fit in
# memory anyway, so it's a decent tradeoff
table2_data = defaultdict(list)
for row in self.read_table(table2, key_filter=key_filter):
table2_data[row[key]].append(row)
for row1 in self.read_table(table1, key_filter=key_filter):
for row2 in table2_data.get(row1[key], []):
joinedrow = OrderedDict(
[('{}:{}'.format(table1, k), v)
for k, v in row1.items()] +
[('{}:{}'.format(table2, k), v)
for k, v in row2.items()]
)
yield joinedrow
[docs] def write_table(self, table, rows, append=False, gzip=False):
"""
Encode and write out *table* to the profile directory.
Args:
table: The name of the table to write
rows: The rows to write to the table
append: If `True`, append the encoded rows to any existing
data.
gzip: If `True`, compress the resulting table with `gzip`.
The table's filename will have `.gz` appended.
"""
_write_table(self.root,
table,
rows,
self.table_relations(table),
append=append,
gzip=gzip,
encoding=self.encoding)
[docs] def write_profile(self, profile_directory, relations_filename=None,
key_filter=True,
append=False, gzip=None):
"""
Write all tables (as specified by the relations) to a profile.
Args:
profile_directory: The directory of the output profile
relations_filename: If given, read and use the relations
at this path instead of the current profile's
relations
key_filter: If True, filter the rows by keys in the index
append: If `True`, append profile data to existing tables
in the output profile directory
gzip: If `True`, compress tables using `gzip`. Table
filenames will have `.gz` appended. If `False`, only
write out text files. If `None`, use whatever the
original file was.
"""
if relations_filename:
src_rels = os.path.abspath(relations_filename)
relations = get_relations(relations_filename)
else:
src_rels = os.path.abspath(os.path.join(self.root,
_relations_filename))
relations = self.relations
tgt_rels = os.path.abspath(os.path.join(profile_directory,
_relations_filename))
if not (os.path.isfile(tgt_rels) and src_rels == tgt_rels):
with open(tgt_rels, 'w') as rel_fh:
print(open(src_rels).read(), file=rel_fh)
tables = self._tables
if tables is not None:
tables = set(tables)
for table, fields in relations.items():
if tables is not None and table not in tables:
continue
try:
fn = _table_filename(os.path.join(self.root, table))
_gzip = gzip if gzip is not None else fn.endswith('.gz')
rows = list(self.read_table(table, key_filter=key_filter))
_write_table(
profile_directory, table, rows, fields,
append=append, gzip=_gzip, encoding=self.encoding
)
except ItsdbError:
logging.warning(
'Could not write "{}"; table doesn\'t exist.'.format(table)
)
continue
self._cleanup(gzip=gzip)
[docs] def exists(self, table=None):
"""
Return True if the profile or a table exist.
If *table* is `None`, this function returns True if the root
directory exists and contains a valid relations file. If *table*
is given, the function returns True if the table exists as a
file (even if empty). Otherwise it returns False.
"""
if not os.path.isdir(self.root):
return False
if not os.path.isfile(os.path.join(self.root, _relations_filename)):
return False
if table is not None:
try:
_table_filename(os.path.join(self.root, table))
except ItsdbError:
return False
return True
[docs] def size(self, table=None):
"""
Return the size, in bytes, of the profile or *table*.
If *table* is `None`, this function returns the size of the
whole profile (i.e. the sum of the table sizes). Otherwise, it
returns the size of *table*.
Note: if the file is gzipped, it returns the compressed size.
"""
size = 0
if table is None:
for table in self.relations:
size += self.size(table)
else:
try:
fn = _table_filename(os.path.join(self.root, table))
size += os.stat(fn).st_size
except ItsdbError:
pass
return size
def _cleanup(self, gzip=None):
for table in self.relations:
txfn = os.path.join(self.root, table)
gzfn = os.path.join(self.root, table + '.gz')
if os.path.isfile(txfn) and os.path.isfile(gzfn):
if gzip is True:
os.remove(txfn)
elif gzip is False:
os.remove(gzfn)
elif os.stat(txfn).st_mtime < os.stat(gzfn).st_mtime:
os.remove(txfn)
else:
os.remove(gzfn)
[docs]class ItsdbSkeleton(ItsdbProfile):
"""
A [incr tsdb()] skeleton, analyzed and ready for reading or writing.
See :class:`ItsdbProfile` for initialization parameters.
.. deprecated:: v0.7.0
"""
_tables = tsdb_core_files
@deprecated(final_version='1.0.0', alternative='TestSuite')
def __init__(self, path, relations=None,
filters=None, applicators=None, index=True,
cast=False, encoding='utf-8'):
super(ItsdbSkeleton, self).__init__(
self,
path,
relations=relations,
filters=filters,
applicators=applicators,
index=index,
cast=cast,
encoding=encoding
)