# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import re
import sys
import numpy as np
from ..utils import OrderedDict
from ..extern import six
from ..extern.six.moves import zip
__all__ = ['register_reader', 'register_writer', 'register_identifier',
'identify_format', 'get_reader', 'get_writer', 'read', 'write',
'get_formats']
__doctest_skip__ = ['register_identifier']
_readers = OrderedDict()
_writers = OrderedDict()
_identifiers = OrderedDict()
def _update__doc__(data_class, readwrite):
"""
Update the docstring to include all the available readers / writers for the
``data_class.read`` or ``data_class.write`` functions (respectively).
"""
FORMATS_TEXT = 'The available built-in formats are:'
# Get the existing read or write method and its docstring
class_readwrite_func = getattr(data_class, readwrite)
if not isinstance(class_readwrite_func.__doc__, six.string_types):
# No docstring--could just be test code, or possibly code compiled
# without docstrings
return
lines = class_readwrite_func.__doc__.splitlines()
# Find the location of the existing formats table if it exists
sep_indices = [ii for ii, line in enumerate(lines) if FORMATS_TEXT in line]
if sep_indices:
# Chop off the existing formats table, including the initial blank line.
chop_index = sep_indices[0]
lines = lines[:chop_index]
# Find the minimum indent, skipping the first line because it might be odd
matches = [re.search('(\S)', line) for line in lines[1:]]
left_indent = min(match.start() for match in matches if match)
# Get the available unified I/O formats for this class
format_table = get_formats(data_class)
# Include only formats that have a reader, and drop the 'Data class' column
has_readwrite = format_table[readwrite.capitalize()] == 'Yes'
format_table = format_table[has_readwrite]
format_table.remove_column('Data class')
# Get the available formats as a table, then munge the output of pformat() a bit and
# put it into the docstring.
new_lines = format_table.pformat(max_lines=-1, max_width=80)
table_rst_sep = re.sub('-', '=', new_lines[1])
new_lines[1] = table_rst_sep
new_lines.insert(0, table_rst_sep)
new_lines.append(table_rst_sep)
# Check for deprecated names and include a warning at the end.
if 'Deprecated' in format_table.colnames:
new_lines.extend(['',
'Deprecated format names like ``aastex`` will be removed in a '
'future version.',
'Use the full name (e.g. ``ascii.aastex``) instead.'])
new_lines = [FORMATS_TEXT, ''] + new_lines
lines.extend([' ' * left_indent + line for line in new_lines])
# Depending on Python version and whether class_readwrite_func is
# an instancemethod or classmethod, one of the following will work.
try:
class_readwrite_func.__doc__ = '\n'.join(lines)
except AttributeError:
class_readwrite_func.__func__.__doc__ = '\n'.join(lines)
[docs]def register_reader(data_format, data_class, function, force=False):
"""
Register a reader function.
Parameters
----------
data_format : str
The data type identifier. This is the string that will be used to
specify the data type when reading.
data_class : classobj
The class of the object that the reader produces
function : function
The function to read in a data object.
force : bool
Whether to override any existing function if already present.
"""
if not (data_format, data_class) in _readers or force:
_readers[(data_format, data_class)] = function
else:
raise Exception("Reader for format '{0}' and class '{1}' is "
'already defined'.format(data_format,
data_class.__name__))
_update__doc__(data_class, 'read')
[docs]def register_writer(data_format, data_class, function, force=False):
"""
Register a table writer function.
Parameters
----------
data_format : str
The data type identifier. This is the string that will be used to
specify the data type when writing.
data_class : classobj
The class of the object that can be written
function : function
The function to write out a data object.
force : bool
Whether to override any existing function if already present.
"""
if not (data_format, data_class) in _writers or force:
_writers[(data_format, data_class)] = function
else:
raise Exception("Writer for format '{0}' and class '{1}' is "
'already defined'.format(data_format,
data_class.__name__))
_update__doc__(data_class, 'write')
[docs]def register_identifier(data_format, data_class, identifier, force=False):
"""
Associate an identifier function with a specific data type.
Parameters
----------
data_format : str
The data type identifier. This is the string that is used to
specify the data type when reading/writing.
data_class : classobj
The class of the object that can be written
identifier : function
A function that checks the argument specified to `read` or `write` to
determine whether the input can be interpreted as a table of type
``data_format``. This function should take the following arguments:
- ``origin``: A string `read` or `write` identifying whether
the file is to be opened for reading or writing.
- ``path``: The path to the file.
- ``fileobj``: An open file object to read the file's contents, or
`None` if the file could not be opened.
- ``*args``: A list of positional arguments to the `read` or
`write` function.
- ``**kwargs``: A list of keyword arguments to the `read` or
`write` function.
One or both of ``path`` or ``fileobj`` may be `None`. If they are
both `None`, the identifier will need to work from ``args[0]``.
The function should return True if the input can be identified
as being of format ``data_format``, and False otherwise.
force : bool
Whether to override any existing function if already present.
Examples
--------
To set the identifier based on extensions, for formats that take a
filename as a first argument, you can do for example::
>>> def my_identifier(*args, **kwargs):
... return (isinstance(args[0], basestring) and
... args[0].endswith('.tbl'))
>>> register_identifier('ipac', Table, my_identifier)
"""
if not (data_format, data_class) in _identifiers or force:
_identifiers[(data_format, data_class)] = identifier
else:
raise Exception("Identifier for format '{0}' and class '{1}' is "
'already defined'.format(data_format,
data_class.__name__))
def _get_format_table_str(data_class, readwrite):
format_table = get_formats(data_class)
if len(format_table) > 0:
has_readwrite = format_table[readwrite] == 'Yes'
format_table = format_table[has_readwrite]
format_table.remove_column('Data class')
format_table_str = '\n'.join(format_table.pformat(max_lines=-1))
return format_table_str
[docs]def get_reader(data_format, data_class):
# Get all the readers that work for `data_format`
readers = [(fmt, cls) for fmt, cls in _readers if fmt == data_format]
for reader_format, reader_class in readers:
if _is_best_match(data_class, reader_class, readers):
return _readers[(reader_format, reader_class)]
else:
format_table_str = _get_format_table_str(data_class, 'Read')
raise Exception("No reader defined for format '{0}' and class '{1}'.\n"
'The available formats are:\n'
'{2}'
.format(data_format, data_class.__name__, format_table_str))
[docs]def get_writer(data_format, data_class):
writers = [(fmt, cls) for fmt, cls in _writers if fmt == data_format]
for writer_format, writer_class in writers:
if _is_best_match(data_class, writer_class, writers):
return _writers[(writer_format, writer_class)]
else:
format_table_str = _get_format_table_str(data_class, 'Write')
raise Exception("No writer defined for format '{0}' and class '{1}'.\n"
'The available formats are:\n'
'{2}'
.format(data_format, data_class.__name__, format_table_str))
[docs]def read(cls, *args, **kwargs):
"""
Read in data
The arguments passed to this method depend on the format
"""
if 'format' in kwargs:
format = kwargs.pop('format')
else:
format = None
ctx = None
try:
if format is None:
path = None
fileobj = None
if len(args):
if isinstance(args[0], six.string_types):
from ..utils.data import get_readable_fileobj
path = args[0]
try:
ctx = get_readable_fileobj(args[0], encoding='binary')
fileobj = ctx.__enter__()
except Exception:
fileobj = None
else:
args = [fileobj] + list(args[1:])
elif hasattr(args[0], 'read'):
path = None
fileobj = args[0]
format = _get_valid_format(
'read', cls, path, fileobj, args, kwargs)
reader = get_reader(format, cls)
data = reader(*args, **kwargs)
if not isinstance(data, cls):
if issubclass(cls, data.__class__):
# User has read with a subclass where only the parent class is
# registered. This returns the parent class, so try coercing to
# desired subclass.
try:
data = cls(data)
except:
raise TypeError('could not convert reader output to {0} class'
.format(cls.__name__))
else:
raise TypeError("reader should return a {0} instance".format(cls.__name__))
finally:
if ctx is not None:
ctx.__exit__(*sys.exc_info())
return data
[docs]def write(data, *args, **kwargs):
"""
Write out data
The arguments passed to this method depend on the format
"""
if 'format' in kwargs:
format = kwargs.pop('format')
else:
format = None
if format is None:
path = None
fileobj = None
if len(args):
if isinstance(args[0], six.string_types):
path = args[0]
fileobj = None
elif hasattr(args[0], 'read'):
path = None
fileobj = args[0]
format = _get_valid_format(
'write', data.__class__, path, fileobj, args, kwargs)
writer = get_writer(format, data.__class__)
writer(data, *args, **kwargs)
def _is_best_match(class1, class2, format_classes):
"""
Determine if class2 is the "best" match for class1 in the list
of classes. It is assumed that (class2 in classes) is True.
class2 is the the best match if:
- class1 is class2 => class1 was directly registered.
- OR class1 is a subclass of class2 and class1 is not in classes.
In this case the subclass will use the parent reader/writer.
"""
classes = [cls for fmt, cls in format_classes]
is_best_match = ((class1 is class2) or (issubclass(class1, class2)
and class1 not in classes))
return is_best_match
def _get_valid_format(mode, cls, path, fileobj, args, kwargs):
"""
Returns the first valid format that can be used to read/write the data in
question. Mode can be either 'read' or 'write'.
"""
if mode == 'read':
funcs = _readers
elif mode == 'write':
funcs = _writers
valid_formats = identify_format(mode, cls, path, fileobj, args, kwargs)
if len(valid_formats) == 0:
format_table_str = _get_format_table_str(cls, mode.capitalize())
raise Exception("Format could not be identified.\n"
"The available formats are:\n"
"{0}".format(format_table_str))
elif len(valid_formats) > 1:
raise Exception(
"Format is ambiguous - options are: {0}".format(
', '.join(sorted(valid_formats, key=lambda tup: tup[0]))))
return valid_formats[0]