Refs #256 - Implement class-based formats

This allows to extend Tablib with new formats far more easily.
This commit is contained in:
Claude Paroz
2019-10-05 15:03:49 +02:00
parent d21bd10908
commit f1046cd13e
19 changed files with 1005 additions and 941 deletions
+5
View File
@@ -6,6 +6,11 @@
- Dropped Python 2 support
### Improvements
- Formats can now be dynamically registered through the
`tablib.formats.registry.refister` API (#256).
### Bugfixes
- Fixed a crash when exporting an empty string with the ReST format (#368)
+22 -15
View File
@@ -90,32 +90,36 @@ Tablib features a micro-framework for adding format support.
The easiest way to understand it is to use it.
So, let's define our own format, named *xxx*.
1. Write a new format interface.
From version 1.0, Tablib formats are class-based and can be dynamically
registered.
:class:`tablib.core` follows a simple pattern for automatically utilizing your format throughout Tablib.
Function names are crucial.
Example **tablib/formats/_xxx.py**: ::
1. Write your custom format class::
class MyXXXFormatClass:
title = 'xxx'
def export_set(dset):
@classmethod
def export_set(cls, dset):
....
# returns string representation of given dataset
def export_book(dbook):
@classmethod
def export_book(cls, dbook):
....
# returns string representation of given databook
def import_set(dset, in_stream):
@classmethod
def import_set(cls, dset, in_stream):
...
# populates given Dataset with given datastream
def import_book(dbook, in_stream):
@classmethod
def import_book(cls, dbook, in_stream):
...
# returns Databook instance
def detect(stream):
@classmethod
def detect(cls, stream):
...
# returns True if given stream is parsable as xxx
@@ -124,15 +128,18 @@ So, let's define our own format, named *xxx*.
If the format excludes support for an import/export mechanism (*e.g.*
:class:`csv <tablib.Dataset.csv>` excludes
:class:`Databook <tablib.Databook>` support),
simply don't define the respective functions.
simply don't define the respective class methods.
Appropriate errors will be raised.
2. Add your new format module to the :class:`tablib.formats.available` tuple.
2. Register your class::
3. Add a mock property to the :class:`Dataset <tablib.Dataset>` class with verbose `reStructured Text`_ docstring.
This alleviates IDE confusion, and allows for pretty auto-generated Sphinx_ documentation.
from tablib.formats import registry
4. Write respective :ref:`tests <testing>`.
registry.register('xxx', MyXXXFormatClass())
3. From then on, you should be able to use your new custom format as if it were
a built-in Tablib format, e.g. using ``dataset.export('xxx')`` will use the
``MyXXXFormatClass.export_set`` method.
.. _testing:
+1 -1
View File
@@ -338,7 +338,7 @@ All we have to do is add them to a :class:`Databook` object... ::
... and export to Excel just like :class:`Datasets <Dataset>`. ::
with open('students.xls', 'wb') as f:
f.write(book.xls)
f.write(book.export('xls'))
The resulting ``students.xls`` file will contain a separate spreadsheet for each :class:`Dataset` object in the :class:`Databook`.
+24 -56
View File
@@ -13,6 +13,7 @@ from copy import copy
from operator import itemgetter
from tablib import formats
from tablib.formats import registry
__title__ = 'tablib'
__author__ = 'Kenneth Reitz'
@@ -145,8 +146,6 @@ class Dataset:
"""
_formats = {}
def __init__(self, *args, **kwargs):
self._data = list(Row(arg) for arg in args)
self.__headers = None
@@ -161,8 +160,6 @@ class Dataset:
self.title = kwargs.get('title')
self._register_formats()
def __len__(self):
return self.height
@@ -232,23 +229,11 @@ class Dataset:
# Internals
# ---------
@classmethod
def _register_formats(cls):
"""Adds format properties."""
for fmt in formats.available:
try:
try:
setattr(cls, fmt.title, property(fmt.export_set, fmt.import_set))
setattr(cls, 'get_%s' % fmt.title, fmt.export_set)
setattr(cls, 'set_%s' % fmt.title, fmt.import_set)
cls._formats[fmt.title] = (fmt.export_set, fmt.import_set)
except AttributeError:
setattr(cls, fmt.title, property(fmt.export_set))
setattr(cls, 'get_%s' % fmt.title, fmt.export_set)
cls._formats[fmt.title] = (fmt.export_set, None)
def _get_in_format(self, fmt, **kwargs):
return fmt.export_set(self, **kwargs)
except AttributeError:
cls._formats[fmt.title] = (None, None)
def _set_in_format(self, fmt, *args, **kwargs):
return fmt.import_set(self, *args, **kwargs)
def _validate(self, row=None, col=None, safety=False):
"""Assures size of every row in dataset is of proper proportions."""
@@ -417,11 +402,14 @@ class Dataset:
if not format:
format = detect_format(in_stream)
export_set, import_set = self._formats.get(format, (None, None))
fmt = registry.get_format(format)
if not hasattr(fmt, 'import_set'):
raise UnsupportedFormat('Format {0} cannot be imported.'.format(format))
if not import_set:
raise UnsupportedFormat('Format {} cannot be imported.'.format(format))
import_set(self, in_stream, **kwargs)
fmt.import_set(self, in_stream, **kwargs)
return self
def export(self, format, **kwargs):
@@ -430,11 +418,11 @@ class Dataset:
:param \\*\\*kwargs: (optional) custom configuration to the format `export_set`.
"""
export_set, import_set = self._formats.get(format, (None, None))
if not export_set:
fmt = registry.get_format(format)
if not hasattr(fmt, 'export_set'):
raise UnsupportedFormat('Format {} cannot be exported.'.format(format))
return export_set(self, **kwargs)
return fmt.export_set(self, **kwargs)
# -------
# Formats
@@ -1012,16 +1000,8 @@ class Databook:
"""A book of :class:`Dataset` objects.
"""
_formats = {}
def __init__(self, sets=None):
if sets is None:
self._datasets = list()
else:
self._datasets = sets
self._register_formats()
self._datasets = sets or []
def __repr__(self):
try:
@@ -1033,21 +1013,6 @@ class Databook:
"""Removes all :class:`Dataset` objects from the :class:`Databook`."""
self._datasets = []
@classmethod
def _register_formats(cls):
"""Adds format properties."""
for fmt in formats.available:
try:
try:
setattr(cls, fmt.title, property(fmt.export_book, fmt.import_book))
cls._formats[fmt.title] = (fmt.export_book, fmt.import_book)
except AttributeError:
setattr(cls, fmt.title, property(fmt.export_book))
cls._formats[fmt.title] = (fmt.export_book, None)
except AttributeError:
cls._formats[fmt.title] = (None, None)
def sheets(self):
return self._datasets
@@ -1089,11 +1054,11 @@ class Databook:
if not format:
format = detect_format(in_stream)
export_book, import_book = self._formats.get(format, (None, None))
if not import_book:
fmt = registry.get_format(format)
if not hasattr(fmt, 'import_book'):
raise UnsupportedFormat('Format {} cannot be loaded.'.format(format))
import_book(self, in_stream, **kwargs)
fmt.import_book(self, in_stream, **kwargs)
return self
def export(self, format, **kwargs):
@@ -1102,16 +1067,16 @@ class Databook:
:param \\*\\*kwargs: (optional) custom configuration to the format `export_book`.
"""
export_book, import_book = self._formats.get(format, (None, None))
if not export_book:
fmt = registry.get_format(format)
if not hasattr(fmt, 'export_book'):
raise UnsupportedFormat('Format {} cannot be exported.'.format(format))
return export_book(self, **kwargs)
return fmt.export_book(self, **kwargs)
def detect_format(stream):
"""Return format name of given stream."""
for fmt in formats.available:
for fmt in registry.formats():
try:
if fmt.detect(stream):
return fmt.title
@@ -1149,3 +1114,6 @@ class HeadersNeeded(Exception):
class UnsupportedFormat(NotImplementedError):
"Format is not supported"
registry.register_builtins()
+73 -15
View File
@@ -1,19 +1,77 @@
""" Tablib - formats
"""
from collections import OrderedDict
from functools import partialmethod
from . import _csv as csv
from . import _dbf as dbf
from . import _df as df
from . import _html as html
from . import _jira as jira
from . import _json as json
from . import _latex as latex
from . import _ods as ods
from . import _rst as rst
from . import _tsv as tsv
from . import _xls as xls
from . import _xlsx as xlsx
from . import _yaml as yaml
from ._csv import CSVFormat
from ._dbf import DBFFormat
from ._df import DataFrameFormat
from ._html import HTMLFormat
from ._jira import JIRAFormat
from ._json import JSONFormat
from ._latex import LATEXFormat
from ._ods import ODSFormat
from ._rst import ReSTFormat
from ._tsv import TSVFormat
from ._xls import XLSFormat
from ._xlsx import XLSXFormat
from ._yaml import YAMLFormat
# xlsx before as xls (xlrd) can also read xlsx
available = (json, xlsx, xls, yaml, csv, dbf, tsv, html, jira, latex, ods, df, rst)
class Registry:
_formats = OrderedDict()
def register(self, key, format_):
from tablib.core import Databook, Dataset
# Create Databook.<format> read or read/write properties
try:
setattr(Databook, format_.title, property(format_.export_book, format_.import_book))
except AttributeError:
try:
setattr(Databook, format_.title, property(format_.export_book))
except AttributeError:
pass
# Create Dataset.<format> read or read/write properties,
# and Dataset.get_<format>/set_<format> methods.
try:
try:
setattr(Dataset, format_.title, property(format_.export_set, format_.import_set))
setattr(Dataset, 'get_%s' % format_.title, partialmethod(Dataset._get_in_format, format_))
setattr(Dataset, 'set_%s' % format_.title, partialmethod(Dataset._set_in_format, format_))
except AttributeError:
setattr(Dataset, format_.title, property(format_.export_set))
setattr(Dataset, 'get_%s' % format_.title, partialmethod(Dataset._get_in_format, format_))
except AttributeError:
raise Exception("Your format class should minimally implement the export_set interface.")
self._formats[key] = format_
def register_builtins(self):
# Registration ordering matters for autodetection.
self.register('json', JSONFormat())
# xlsx before as xls (xlrd) can also read xlsx
self.register('xlsx', XLSXFormat())
self.register('xls', XLSFormat())
self.register('yaml', YAMLFormat())
self.register('csv', CSVFormat())
self.register('tsv', TSVFormat())
self.register('ods', ODSFormat())
self.register('dbf', DBFFormat())
self.register('html', HTMLFormat())
self.register('jira', JIRAFormat())
self.register('latex', LATEXFormat())
self.register('df', DataFrameFormat())
self.register('rst', ReSTFormat())
def formats(self):
for frm in self._formats.values():
yield frm
def get_format(self, key):
return self._formats[key]
registry = Registry()
+37 -37
View File
@@ -4,54 +4,54 @@
import csv
from io import StringIO
title = 'csv'
extensions = ('csv',)
class CSVFormat:
title = 'csv'
extensions = ('csv',)
DEFAULT_DELIMITER = ','
DEFAULT_DELIMITER = ','
@classmethod
def export_stream_set(cls, dataset, **kwargs):
"""Returns CSV representation of Dataset as file-like."""
stream = StringIO()
def export_stream_set(dataset, **kwargs):
"""Returns CSV representation of Dataset as file-like."""
stream = StringIO()
kwargs.setdefault('delimiter', cls.DEFAULT_DELIMITER)
kwargs.setdefault('delimiter', DEFAULT_DELIMITER)
_csv = csv.writer(stream, **kwargs)
_csv = csv.writer(stream, **kwargs)
for row in dataset._package(dicts=False):
_csv.writerow(row)
for row in dataset._package(dicts=False):
_csv.writerow(row)
stream.seek(0)
return stream
stream.seek(0)
return stream
@classmethod
def export_set(cls, dataset, **kwargs):
"""Returns CSV representation of Dataset."""
stream = cls.export_stream_set(dataset, **kwargs)
return stream.getvalue()
@classmethod
def import_set(cls, dset, in_stream, headers=True, **kwargs):
"""Returns dataset from CSV stream."""
def export_set(dataset, **kwargs):
"""Returns CSV representation of Dataset."""
stream = export_stream_set(dataset, **kwargs)
return stream.getvalue()
dset.wipe()
kwargs.setdefault('delimiter', cls.DEFAULT_DELIMITER)
def import_set(dset, in_stream, headers=True, **kwargs):
"""Returns dataset from CSV stream."""
rows = csv.reader(StringIO(in_stream), **kwargs)
for i, row in enumerate(rows):
dset.wipe()
if (i == 0) and (headers):
dset.headers = row
elif row:
dset.append(row)
kwargs.setdefault('delimiter', DEFAULT_DELIMITER)
rows = csv.reader(StringIO(in_stream), **kwargs)
for i, row in enumerate(rows):
if (i == 0) and (headers):
dset.headers = row
elif row:
dset.append(row)
def detect(stream, delimiter=DEFAULT_DELIMITER):
"""Returns True if given stream is valid CSV."""
try:
csv.Sniffer().sniff(stream[:1024], delimiters=delimiter)
return True
except Exception:
return False
def detect(cls, stream, delimiter=None):
"""Returns True if given stream is valid CSV."""
try:
csv.Sniffer().sniff(stream[:1024], delimiters=delimiter or cls.DEFAULT_DELIMITER)
return True
except Exception:
return False
+50 -48
View File
@@ -7,61 +7,63 @@ import tempfile
from tablib.packages.dbfpy import dbf, dbfnew
from tablib.packages.dbfpy import record as dbfrecord
title = 'dbf'
extensions = ('csv',)
DEFAULT_ENCODING = 'utf-8'
class DBFFormat:
title = 'dbf'
extensions = ('csv',)
DEFAULT_ENCODING = 'utf-8'
def export_set(dataset):
"""Returns DBF representation of a Dataset"""
new_dbf = dbfnew.dbf_new()
temp_file, temp_uri = tempfile.mkstemp()
@classmethod
def export_set(cls, dataset):
"""Returns DBF representation of a Dataset"""
new_dbf = dbfnew.dbf_new()
temp_file, temp_uri = tempfile.mkstemp()
# create the appropriate fields based on the contents of the first row
first_row = dataset[0]
for fieldname, field_value in zip(dataset.headers, first_row):
if type(field_value) in [int, float]:
new_dbf.add_field(fieldname, 'N', 10, 8)
else:
new_dbf.add_field(fieldname, 'C', 80)
# create the appropriate fields based on the contents of the first row
first_row = dataset[0]
for fieldname, field_value in zip(dataset.headers, first_row):
if type(field_value) in [int, float]:
new_dbf.add_field(fieldname, 'N', 10, 8)
else:
new_dbf.add_field(fieldname, 'C', 80)
new_dbf.write(temp_uri)
new_dbf.write(temp_uri)
dbf_file = dbf.Dbf(temp_uri, readOnly=0)
for row in dataset:
record = dbfrecord.DbfRecord(dbf_file)
for fieldname, field_value in zip(dataset.headers, row):
record[fieldname] = field_value
record.store()
dbf_file = dbf.Dbf(temp_uri, readOnly=0)
for row in dataset:
record = dbfrecord.DbfRecord(dbf_file)
for fieldname, field_value in zip(dataset.headers, row):
record[fieldname] = field_value
record.store()
dbf_file.close()
dbf_stream = open(temp_uri, 'rb')
stream = io.BytesIO(dbf_stream.read())
dbf_stream.close()
os.close(temp_file)
os.remove(temp_uri)
return stream.getvalue()
dbf_file.close()
dbf_stream = open(temp_uri, 'rb')
stream = io.BytesIO(dbf_stream.read())
dbf_stream.close()
os.close(temp_file)
os.remove(temp_uri)
return stream.getvalue()
@classmethod
def import_set(cls, dset, in_stream, headers=True):
"""Returns a dataset from a DBF stream."""
def import_set(dset, in_stream, headers=True):
"""Returns a dataset from a DBF stream."""
dset.wipe()
_dbf = dbf.Dbf(io.BytesIO(in_stream))
dset.headers = _dbf.fieldNames
for record in range(_dbf.recordCount):
row = [_dbf[record][f] for f in _dbf.fieldNames]
dset.append(row)
dset.wipe()
_dbf = dbf.Dbf(io.BytesIO(in_stream))
dset.headers = _dbf.fieldNames
for record in range(_dbf.recordCount):
row = [_dbf[record][f] for f in _dbf.fieldNames]
dset.append(row)
def detect(stream):
"""Returns True if the given stream is valid DBF"""
# _dbf = dbf.Table(StringIO(stream))
try:
if type(stream) is not bytes:
stream = bytes(stream, 'utf-8')
dbf.Dbf(io.BytesIO(stream), readOnly=True)
return True
except Exception:
return False
@classmethod
def detect(cls, stream):
"""Returns True if the given stream is valid DBF"""
#_dbf = dbf.Table(StringIO(stream))
try:
if type(stream) is not bytes:
stream = bytes(stream, 'utf-8')
_dbf = dbf.Dbf(io.BytesIO(stream), readOnly=True)
return True
except Exception:
return False
+27 -26
View File
@@ -7,32 +7,33 @@ except ImportError:
DataFrame = None
title = 'df'
extensions = ('df', )
class DataFrameFormat:
title = 'df'
extensions = ('df',)
@classmethod
def detect(cls, stream):
"""Returns True if given stream is a DataFrame."""
if DataFrame is None:
return False
try:
DataFrame(stream)
return True
except ValueError:
return False
def detect(stream):
"""Returns True if given stream is a DataFrame."""
if DataFrame is None:
return False
try:
DataFrame(stream)
return True
except ValueError:
return False
@classmethod
def export_set(cls, dset, index=None):
"""Returns DataFrame representation of DataBook."""
if DataFrame is None:
raise NotImplementedError(
'DataFrame Format requires `pandas` to be installed.'
' Try `pip install tablib[pandas]`.')
dataframe = DataFrame(dset.dict, columns=dset.headers)
return dataframe
def export_set(dset, index=None):
"""Returns DataFrame representation of DataBook."""
if DataFrame is None:
raise NotImplementedError(
'DataFrame Format requires `pandas` to be installed.'
' Try `pip install tablib[pandas]`.')
dataframe = DataFrame(dset.dict, columns=dset.headers)
return dataframe
def import_set(dset, in_stream):
"""Returns dataset from DataFrame."""
dset.wipe()
dset.dict = in_stream.to_dict(orient='records')
@classmethod
def import_set(cls, dset, in_stream):
"""Returns dataset from DataFrame."""
dset.wipe()
dset.dict = in_stream.to_dict(orient='records')
+37 -35
View File
@@ -6,55 +6,57 @@ from io import BytesIO
from MarkupPy import markup
BOOK_ENDINGS = 'h3'
title = 'html'
extensions = ('html', )
class HTMLFormat:
BOOK_ENDINGS = 'h3'
title = 'html'
extensions = ('html', )
def export_set(dataset):
"""HTML representation of a Dataset."""
@classmethod
def export_set(cls, dataset):
"""HTML representation of a Dataset."""
stream = BytesIO()
stream = BytesIO()
page = markup.page()
page.table.open()
page = markup.page()
page.table.open()
if dataset.headers is not None:
new_header = [item if item is not None else '' for item in dataset.headers]
if dataset.headers is not None:
new_header = [item if item is not None else '' for item in dataset.headers]
page.thead.open()
headers = markup.oneliner.th(new_header)
page.tr(headers)
page.thead.close()
page.thead.open()
headers = markup.oneliner.th(new_header)
page.tr(headers)
page.thead.close()
for row in dataset:
new_row = [item if item is not None else '' for item in row]
for row in dataset:
new_row = [item if item is not None else '' for item in row]
html_row = markup.oneliner.td(new_row)
page.tr(html_row)
html_row = markup.oneliner.td(new_row)
page.tr(html_row)
page.table.close()
page.table.close()
# Allow unicode characters in output
wrapper = codecs.getwriter("utf8")(stream)
wrapper.writelines(str(page))
# Allow unicode characters in output
wrapper = codecs.getwriter("utf8")(stream)
wrapper.writelines(str(page))
return stream.getvalue().decode('utf-8')
return stream.getvalue().decode('utf-8')
@classmethod
def export_book(cls, databook):
"""HTML representation of a Databook."""
def export_book(databook):
"""HTML representation of a Databook."""
stream = BytesIO()
stream = BytesIO()
# Allow unicode characters in output
wrapper = codecs.getwriter("utf8")(stream)
# Allow unicode characters in output
wrapper = codecs.getwriter("utf8")(stream)
for i, dset in enumerate(databook._datasets):
title = (dset.title if dset.title else 'Set %s' % (i))
wrapper.write('<{}>{}</{}>\n'.format(cls.BOOK_ENDINGS, title, cls.BOOK_ENDINGS))
wrapper.write(dset.html)
wrapper.write('\n')
for i, dset in enumerate(databook._datasets):
title = (dset.title if dset.title else 'Set %s' % (i))
wrapper.write('<{}>{}</{}>\n'.format(BOOK_ENDINGS, title, BOOK_ENDINGS))
wrapper.write(dset.html)
wrapper.write('\n')
return stream.getvalue().decode('utf-8')
return stream.getvalue().decode('utf-8')
+27 -23
View File
@@ -3,34 +3,38 @@
Generates a Jira table from the dataset.
"""
title = 'jira'
class JIRAFormat:
title = 'jira'
def export_set(dataset):
"""Formats the dataset according to the Jira table syntax:
@classmethod
def export_set(cls, dataset):
"""Formats the dataset according to the Jira table syntax:
||heading 1||heading 2||heading 3||
|col A1|col A2|col A3|
|col B1|col B2|col B3|
||heading 1||heading 2||heading 3||
|col A1|col A2|col A3|
|col B1|col B2|col B3|
:param dataset: dataset to serialize
:type dataset: tablib.core.Dataset
"""
:param dataset: dataset to serialize
:type dataset: tablib.core.Dataset
"""
header = _get_header(dataset.headers) if dataset.headers else ''
body = _get_body(dataset)
return '{}\n{}'.format(header, body) if header else body
header = cls._get_header(dataset.headers) if dataset.headers else ''
body = cls._get_body(dataset)
return '{}\n{}'.format(header, body) if header else body
@classmethod
def _get_body(cls, dataset):
return '\n'.join([cls._serialize_row(row) for row in dataset])
def _get_body(dataset):
return '\n'.join([_serialize_row(row) for row in dataset])
@classmethod
def _get_header(cls, headers):
return cls._serialize_row(headers, delimiter='||')
def _get_header(headers):
return _serialize_row(headers, delimiter='||')
def _serialize_row(row, delimiter='|'):
return '{}{}{}'.format(delimiter,
delimiter.join([str(item) if item else ' ' for item in row]),
delimiter)
@classmethod
def _serialize_row(cls, row, delimiter='|'):
return '{}{}{}'.format(
delimiter,
delimiter.join([str(item) if item else ' ' for item in row]),
delimiter
)
+33 -31
View File
@@ -6,9 +6,6 @@ from uuid import UUID
import tablib
title = 'json'
extensions = ('json', 'jsn')
def serialize_objects_handler(obj):
if isinstance(obj, (decimal.Decimal, UUID)):
@@ -19,38 +16,43 @@ def serialize_objects_handler(obj):
return obj
def export_set(dataset):
"""Returns JSON representation of Dataset."""
return json.dumps(dataset.dict, default=serialize_objects_handler)
class JSONFormat:
title = 'json'
extensions = ('json', 'jsn')
@classmethod
def export_set(cls, dataset):
"""Returns JSON representation of Dataset."""
return json.dumps(dataset.dict, default=serialize_objects_handler)
def export_book(databook):
"""Returns JSON representation of Databook."""
return json.dumps(databook._package(), default=serialize_objects_handler)
@classmethod
def export_book(cls, databook):
"""Returns JSON representation of Databook."""
return json.dumps(databook._package(), default=serialize_objects_handler)
@classmethod
def import_set(cls, dset, in_stream):
"""Returns dataset from JSON stream."""
def import_set(dset, in_stream):
"""Returns dataset from JSON stream."""
dset.wipe()
dset.dict = json.loads(in_stream)
dset.wipe()
dset.dict = json.loads(in_stream)
@classmethod
def import_book(cls, dbook, in_stream):
"""Returns databook from JSON stream."""
dbook.wipe()
for sheet in json.loads(in_stream):
data = tablib.Dataset()
data.title = sheet['title']
data.dict = sheet['data']
dbook.add_sheet(data)
def import_book(dbook, in_stream):
"""Returns databook from JSON stream."""
dbook.wipe()
for sheet in json.loads(in_stream):
data = tablib.Dataset()
data.title = sheet['title']
data.dict = sheet['data']
dbook.add_sheet(data)
def detect(stream):
"""Returns True if given stream is valid JSON."""
try:
json.loads(stream)
return True
except (TypeError, ValueError):
return False
@classmethod
def detect(cls, stream):
"""Returns True if given stream is valid JSON."""
try:
json.loads(stream)
return True
except (TypeError, ValueError):
return False
+90 -88
View File
@@ -4,10 +4,12 @@
"""
import re
title = 'latex'
extensions = ('tex',)
TABLE_TEMPLATE = """\
class LATEXFormat:
title = 'latex'
extensions = ('tex',)
TABLE_TEMPLATE = """\
%% Note: add \\usepackage{booktabs} to your preamble
%%
\\begin{table}[!htbp]
@@ -23,108 +25,108 @@ TABLE_TEMPLATE = """\
\\end{table}
"""
TEX_RESERVED_SYMBOLS_MAP = dict([
('\\', '\\textbackslash{}'),
('{', '\\{'),
('}', '\\}'),
('$', '\\$'),
('&', '\\&'),
('#', '\\#'),
('^', '\\textasciicircum{}'),
('_', '\\_'),
('~', '\\textasciitilde{}'),
('%', '\\%'),
])
TEX_RESERVED_SYMBOLS_MAP = dict([
('\\', '\\textbackslash{}'),
('{', '\\{'),
('}', '\\}'),
('$', '\\$'),
('&', '\\&'),
('#', '\\#'),
('^', '\\textasciicircum{}'),
('_', '\\_'),
('~', '\\textasciitilde{}'),
('%', '\\%'),
])
TEX_RESERVED_SYMBOLS_RE = re.compile(
'(%s)' % '|'.join(map(re.escape, TEX_RESERVED_SYMBOLS_MAP.keys())))
TEX_RESERVED_SYMBOLS_RE = re.compile(
'(%s)' % '|'.join(map(re.escape, TEX_RESERVED_SYMBOLS_MAP.keys())))
@classmethod
def export_set(cls, dataset):
"""Returns LaTeX representation of dataset
def export_set(dataset):
"""Returns LaTeX representation of dataset
:param dataset: dataset to serialize
:type dataset: tablib.core.Dataset
"""
:param dataset: dataset to serialize
:type dataset: tablib.core.Dataset
"""
caption = '\\caption{%s}' % dataset.title if dataset.title else '%'
colspec = cls._colspec(dataset.width)
header = cls._serialize_row(dataset.headers) if dataset.headers else ''
midrule = cls._midrule(dataset.width)
body = '\n'.join([cls._serialize_row(row) for row in dataset])
return cls.TABLE_TEMPLATE % dict(CAPTION=caption, COLSPEC=colspec,
HEADER=header, MIDRULE=midrule, BODY=body)
caption = '\\caption{%s}' % dataset.title if dataset.title else '%'
colspec = _colspec(dataset.width)
header = _serialize_row(dataset.headers) if dataset.headers else ''
midrule = _midrule(dataset.width)
body = '\n'.join([_serialize_row(row) for row in dataset])
return TABLE_TEMPLATE % dict(CAPTION=caption, COLSPEC=colspec,
HEADER=header, MIDRULE=midrule, BODY=body)
@classmethod
def _colspec(cls, dataset_width):
"""Generates the column specification for the LaTeX `tabular` environment
based on the dataset width.
The first column is justified to the left, all further columns are aligned
to the right.
def _colspec(dataset_width):
"""Generates the column specification for the LaTeX `tabular` environment
based on the dataset width.
.. note:: This is only a heuristic and most probably has to be fine-tuned
post export. Column alignment should depend on the data type, e.g., textual
content should usually be aligned to the left while numeric content almost
always should be aligned to the right.
The first column is justified to the left, all further columns are aligned
to the right.
:param dataset_width: width of the dataset
"""
.. note:: This is only a heuristic and most probably has to be fine-tuned
post export. Column alignment should depend on the data type, e.g., textual
content should usually be aligned to the left while numeric content almost
always should be aligned to the right.
spec = 'l'
for _ in range(1, dataset_width):
spec += 'r'
return spec
:param dataset_width: width of the dataset
"""
@classmethod
def _midrule(cls, dataset_width):
"""Generates the table `midrule`, which may be composed of several
`cmidrules`.
spec = 'l'
for _ in range(1, dataset_width):
spec += 'r'
return spec
:param dataset_width: width of the dataset to serialize
"""
if not dataset_width or dataset_width == 1:
return '\\midrule'
return ' '.join([cls._cmidrule(colindex, dataset_width) for colindex in
range(1, dataset_width + 1)])
def _midrule(dataset_width):
"""Generates the table `midrule`, which may be composed of several
`cmidrules`.
@classmethod
def _cmidrule(cls, colindex, dataset_width):
"""Generates the `cmidrule` for a single column with appropriate trimming
based on the column position.
:param dataset_width: width of the dataset to serialize
"""
:param colindex: Column index
:param dataset_width: width of the dataset
"""
if not dataset_width or dataset_width == 1:
return '\\midrule'
return ' '.join([_cmidrule(colindex, dataset_width) for colindex in
range(1, dataset_width + 1)])
rule = '\\cmidrule(%s){%d-%d}'
if colindex == 1:
# Rule of first column is trimmed on the right
return rule % ('r', colindex, colindex)
if colindex == dataset_width:
# Rule of last column is trimmed on the left
return rule % ('l', colindex, colindex)
# Inner columns are trimmed on the left and right
return rule % ('lr', colindex, colindex)
@classmethod
def _serialize_row(cls, row):
"""Returns string representation of a single row.
def _cmidrule(colindex, dataset_width):
"""Generates the `cmidrule` for a single column with appropriate trimming
based on the column position.
:param row: single dataset row
"""
:param colindex: Column index
:param dataset_width: width of the dataset
"""
new_row = [cls._escape_tex_reserved_symbols(str(item)) if item else ''
for item in row]
return 6 * ' ' + ' & '.join(new_row) + ' \\\\'
rule = '\\cmidrule(%s){%d-%d}'
if colindex == 1:
# Rule of first column is trimmed on the right
return rule % ('r', colindex, colindex)
if colindex == dataset_width:
# Rule of last column is trimmed on the left
return rule % ('l', colindex, colindex)
# Inner columns are trimmed on the left and right
return rule % ('lr', colindex, colindex)
@classmethod
def _escape_tex_reserved_symbols(cls, input):
"""Escapes all TeX reserved symbols ('_', '~', etc.) in a string.
def _serialize_row(row):
"""Returns string representation of a single row.
:param row: single dataset row
"""
new_row = [_escape_tex_reserved_symbols(str(item)) if item else '' for
item in row]
return 6 * ' ' + ' & '.join(new_row) + ' \\\\'
def _escape_tex_reserved_symbols(input):
"""Escapes all TeX reserved symbols ('_', '~', etc.) in a string.
:param input: String to escape
"""
def replace(match):
return TEX_RESERVED_SYMBOLS_MAP[match.group()]
return TEX_RESERVED_SYMBOLS_RE.sub(replace, input)
:param input: String to escape
"""
def replace(match):
return cls.TEX_RESERVED_SYMBOLS_MAP[match.group()]
return cls.TEX_RESERVED_SYMBOLS_RE.sub(replace, input)
+76 -74
View File
@@ -5,99 +5,101 @@ from io import BytesIO
from odf import opendocument, style, table, text
title = 'ods'
extensions = ('ods',)
bold = style.Style(name="bold", family="paragraph")
bold.addElement(style.TextProperties(fontweight="bold", fontweightasian="bold", fontweightcomplex="bold"))
def export_set(dataset):
"""Returns ODF representation of Dataset."""
class ODSFormat:
title = 'ods'
extensions = ('ods',)
wb = opendocument.OpenDocumentSpreadsheet()
wb.automaticstyles.addElement(bold)
@classmethod
def export_set(cls, dataset):
"""Returns ODF representation of Dataset."""
ws = table.Table(name=dataset.title if dataset.title else 'Tablib Dataset')
wb.spreadsheet.addElement(ws)
dset_sheet(dataset, ws)
wb = opendocument.OpenDocumentSpreadsheet()
wb.automaticstyles.addElement(bold)
stream = BytesIO()
wb.save(stream)
return stream.getvalue()
def export_book(databook):
"""Returns ODF representation of DataBook."""
wb = opendocument.OpenDocumentSpreadsheet()
wb.automaticstyles.addElement(bold)
for i, dset in enumerate(databook._datasets):
ws = table.Table(name=dset.title if dset.title else 'Sheet%s' % (i))
ws = table.Table(name=dataset.title if dataset.title else 'Tablib Dataset')
wb.spreadsheet.addElement(ws)
dset_sheet(dset, ws)
cls.dset_sheet(dataset, ws)
stream = BytesIO()
wb.save(stream)
return stream.getvalue()
stream = BytesIO()
wb.save(stream)
return stream.getvalue()
@classmethod
def export_book(cls, databook):
"""Returns ODF representation of DataBook."""
def dset_sheet(dataset, ws):
"""Completes given worksheet from given Dataset."""
_package = dataset._package(dicts=False)
wb = opendocument.OpenDocumentSpreadsheet()
wb.automaticstyles.addElement(bold)
for i, sep in enumerate(dataset._separators):
_offset = i
_package.insert((sep[0] + _offset), (sep[1],))
for i, dset in enumerate(databook._datasets):
ws = table.Table(name=dset.title if dset.title else 'Sheet%s' % (i))
wb.spreadsheet.addElement(ws)
cls.dset_sheet(dset, ws)
for i, row in enumerate(_package):
row_number = i + 1
odf_row = table.TableRow(stylename=bold, defaultcellstylename='bold')
for j, col in enumerate(row):
try:
col = str(col, errors='ignore')
except TypeError:
## col is already str
pass
ws.addElement(table.TableColumn())
stream = BytesIO()
wb.save(stream)
return stream.getvalue()
# bold headers
if (row_number == 1) and dataset.headers:
odf_row.setAttribute('stylename', bold)
ws.addElement(odf_row)
cell = table.TableCell()
p = text.P()
p.addElement(text.Span(text=col, stylename=bold))
cell.addElement(p)
odf_row.addElement(cell)
@classmethod
def dset_sheet(cls, dataset, ws):
"""Completes given worksheet from given Dataset."""
_package = dataset._package(dicts=False)
# wrap the rest
else:
for i, sep in enumerate(dataset._separators):
_offset = i
_package.insert((sep[0] + _offset), (sep[1],))
for i, row in enumerate(_package):
row_number = i + 1
odf_row = table.TableRow(stylename=bold, defaultcellstylename='bold')
for j, col in enumerate(row):
try:
if '\n' in col:
ws.addElement(odf_row)
cell = table.TableCell()
cell.addElement(text.P(text=col))
odf_row.addElement(cell)
else:
ws.addElement(odf_row)
cell = table.TableCell()
cell.addElement(text.P(text=col))
odf_row.addElement(cell)
col = str(col, errors='ignore')
except TypeError:
## col is already str
pass
ws.addElement(table.TableColumn())
# bold headers
if (row_number == 1) and dataset.headers:
odf_row.setAttribute('stylename', bold)
ws.addElement(odf_row)
cell = table.TableCell()
cell.addElement(text.P(text=col))
p = text.P()
p.addElement(text.Span(text=col, stylename=bold))
cell.addElement(p)
odf_row.addElement(cell)
# wrap the rest
else:
try:
if '\n' in col:
ws.addElement(odf_row)
cell = table.TableCell()
cell.addElement(text.P(text=col))
odf_row.addElement(cell)
else:
ws.addElement(odf_row)
cell = table.TableCell()
cell.addElement(text.P(text=col))
odf_row.addElement(cell)
except TypeError:
ws.addElement(odf_row)
cell = table.TableCell()
cell.addElement(text.P(text=col))
odf_row.addElement(cell)
def detect(stream):
if isinstance(stream, bytes):
# load expects a file-like object.
stream = BytesIO(stream)
try:
opendocument.load(stream)
return True
except Exception:
return False
@classmethod
def detect(cls, stream):
if isinstance(stream, bytes):
# load expects a file-like object.
stream = BytesIO(stream)
try:
opendocument.load(stream)
return True
except Exception:
return False
+217 -213
View File
@@ -5,13 +5,6 @@ from itertools import zip_longest
from statistics import median
from textwrap import TextWrapper
title = 'rst'
extensions = ('rst',)
MAX_TABLE_WIDTH = 80 # Roughly. It may be wider to avoid breaking words.
JUSTIFY_LEFT = 'left'
JUSTIFY_CENTER = 'center'
JUSTIFY_RIGHT = 'right'
@@ -28,236 +21,247 @@ def _max_word_len(text):
"""
Return the length of the longest word in `text`.
>>> _max_word_len('Python Module for Tabular Datasets')
8
"""
return max(len(word) for word in text.split()) if text else 0
def _get_column_string_lengths(dataset):
"""
Returns a list of string lengths of each column, and a list of
maximum word lengths.
"""
if dataset.headers:
column_lengths = [[len(h)] for h in dataset.headers]
word_lens = [_max_word_len(h) for h in dataset.headers]
else:
column_lengths = [[] for _ in range(dataset.width)]
word_lens = [0 for _ in range(dataset.width)]
for row in dataset.dict:
values = iter(row.values() if hasattr(row, 'values') else row)
for i, val in enumerate(values):
text = to_str(val)
column_lengths[i].append(len(text))
word_lens[i] = max(word_lens[i], _max_word_len(text))
return column_lengths, word_lens
class ReSTFormat:
title = 'rst'
extensions = ('rst',)
MAX_TABLE_WIDTH = 80 # Roughly. It may be wider to avoid breaking words.
@classmethod
def _get_column_string_lengths(cls, dataset):
"""
Returns a list of string lengths of each column, and a list of
maximum word lengths.
"""
if dataset.headers:
column_lengths = [[len(h)] for h in dataset.headers]
word_lens = [_max_word_len(h) for h in dataset.headers]
else:
column_lengths = [[] for _ in range(dataset.width)]
word_lens = [0 for _ in range(dataset.width)]
for row in dataset.dict:
values = iter(row.values() if hasattr(row, 'values') else row)
for i, val in enumerate(values):
text = to_str(val)
column_lengths[i].append(len(text))
word_lens[i] = max(word_lens[i], _max_word_len(text))
return column_lengths, word_lens
@classmethod
def _row_to_lines(cls, values, widths, wrapper, sep='|', justify=JUSTIFY_LEFT):
"""
Returns a table row of wrapped values as a list of lines
"""
if justify not in JUSTIFY_VALUES:
raise ValueError('Value of "justify" must be one of "{}"'.format(
'", "'.join(JUSTIFY_VALUES)
))
if justify == JUSTIFY_LEFT:
just = lambda text, width: text.ljust(width)
elif justify == JUSTIFY_CENTER:
just = lambda text, width: text.center(width)
else:
just = lambda text, width: text.rjust(width)
lpad = sep + ' ' if sep else ''
rpad = ' ' + sep if sep else ''
pad = ' ' + sep + ' '
cells = []
for value, width in zip(values, widths):
wrapper.width = width
text = to_str(value)
cell = wrapper.wrap(text)
cells.append(cell)
lines = zip_longest(*cells, fillvalue='')
lines = (
(just(cell_line, widths[i]) for i, cell_line in enumerate(line))
for line in lines
)
lines = [''.join((lpad, pad.join(line), rpad)) for line in lines]
return lines
def _row_to_lines(values, widths, wrapper, sep='|', justify=JUSTIFY_LEFT):
"""
Returns a table row of wrapped values as a list of lines
"""
if justify not in JUSTIFY_VALUES:
raise ValueError('Value of "justify" must be one of "{}"'.format(
'", "'.join(JUSTIFY_VALUES)
))
if justify == JUSTIFY_LEFT:
just = lambda text, width: text.ljust(width)
elif justify == JUSTIFY_CENTER:
just = lambda text, width: text.center(width)
else:
just = lambda text, width: text.rjust(width)
lpad = sep + ' ' if sep else ''
rpad = ' ' + sep if sep else ''
pad = ' ' + sep + ' '
cells = []
for value, width in zip(values, widths):
wrapper.width = width
text = to_str(value)
cell = wrapper.wrap(text)
cells.append(cell)
lines = zip_longest(*cells, fillvalue='')
lines = (
(just(cell_line, widths[i]) for i, cell_line in enumerate(line))
for line in lines
)
lines = [''.join((lpad, pad.join(line), rpad)) for line in lines]
return lines
@classmethod
def _get_column_widths(cls, dataset, max_table_width=MAX_TABLE_WIDTH, pad_len=3):
"""
Returns a list of column widths proportional to the median length
of the text in their cells.
"""
str_lens, word_lens = cls._get_column_string_lengths(dataset)
median_lens = [int(median(lens)) for lens in str_lens]
total = sum(median_lens)
if total > max_table_width - (pad_len * len(median_lens)):
column_widths = (max_table_width * l // total for l in median_lens)
else:
column_widths = (l for l in median_lens)
# Allow for separator and padding:
column_widths = (w - pad_len if w > pad_len else w for w in column_widths)
# Rather widen table than break words:
column_widths = [max(w, l) for w, l in zip(column_widths, word_lens)]
return column_widths
@classmethod
def export_set_as_simple_table(cls, dataset, column_widths=None):
"""
Returns reStructuredText grid table representation of dataset.
"""
lines = []
wrapper = TextWrapper()
if column_widths is None:
column_widths = _get_column_widths(dataset, pad_len=2)
border = ' '.join(['=' * w for w in column_widths])
def _get_column_widths(dataset, max_table_width=MAX_TABLE_WIDTH, pad_len=3):
"""
Returns a list of column widths proportional to the median length
of the text in their cells.
"""
str_lens, word_lens = _get_column_string_lengths(dataset)
median_lens = [int(median(lens)) for lens in str_lens]
total = sum(median_lens)
if total > max_table_width - (pad_len * len(median_lens)):
column_widths = (max_table_width * l // total for l in median_lens)
else:
column_widths = (l for l in median_lens)
# Allow for separator and padding:
column_widths = (w - pad_len if w > pad_len else w for w in column_widths)
# Rather widen table than break words:
column_widths = [max(w, l) for w, l in zip(column_widths, word_lens)]
return column_widths
def export_set_as_simple_table(dataset, column_widths=None):
"""
Returns reStructuredText grid table representation of dataset.
"""
lines = []
wrapper = TextWrapper()
if column_widths is None:
column_widths = _get_column_widths(dataset, pad_len=2)
border = ' '.join(['=' * w for w in column_widths])
lines.append(border)
if dataset.headers:
lines.extend(_row_to_lines(
dataset.headers,
column_widths,
wrapper,
sep='',
justify=JUSTIFY_CENTER,
))
lines.append(border)
for row in dataset.dict:
values = iter(row.values() if hasattr(row, 'values') else row)
lines.extend(_row_to_lines(values, column_widths, wrapper, ''))
lines.append(border)
return '\n'.join(lines)
if dataset.headers:
lines.extend(cls._row_to_lines(
dataset.headers,
column_widths,
wrapper,
sep='',
justify=JUSTIFY_CENTER,
))
lines.append(border)
for row in dataset.dict:
values = iter(row.values() if hasattr(row, 'values') else row)
lines.extend(cls._row_to_lines(values, column_widths, wrapper, ''))
lines.append(border)
return '\n'.join(lines)
@classmethod
def export_set_as_grid_table(cls, dataset, column_widths=None):
"""
Returns reStructuredText grid table representation of dataset.
def export_set_as_grid_table(dataset, column_widths=None):
"""
Returns reStructuredText grid table representation of dataset.
>>> from tablib import Dataset
>>> from tablib.formats import registry
>>> bits = ((0, 0), (1, 0), (0, 1), (1, 1))
>>> data = Dataset()
>>> data.headers = ['A', 'B', 'A and B']
>>> for a, b in bits:
... data.append([bool(a), bool(b), bool(a * b)])
>>> rst = registry.get_format('rst')
>>> print(rst.export_set(data, force_grid=True))
+-------+-------+-------+
| A | B | A and |
| | | B |
+=======+=======+=======+
| False | False | False |
+-------+-------+-------+
| True | False | False |
+-------+-------+-------+
| False | True | False |
+-------+-------+-------+
| True | True | True |
+-------+-------+-------+
"""
lines = []
wrapper = TextWrapper()
if column_widths is None:
column_widths = cls._get_column_widths(dataset)
header_sep = '+=' + '=+='.join(['=' * w for w in column_widths]) + '=+'
row_sep = '+-' + '-+-'.join(['-' * w for w in column_widths]) + '-+'
>>> from tablib import Dataset
>>> from tablib.formats import rst
>>> bits = ((0, 0), (1, 0), (0, 1), (1, 1))
>>> data = Dataset()
>>> data.headers = ['A', 'B', 'A and B']
>>> for a, b in bits:
... data.append([bool(a), bool(b), bool(a * b)])
>>> print(rst.export_set(data, force_grid=True))
+-------+-------+-------+
| A | B | A and |
| | | B |
+=======+=======+=======+
| False | False | False |
+-------+-------+-------+
| True | False | False |
+-------+-------+-------+
| False | True | False |
+-------+-------+-------+
| True | True | True |
+-------+-------+-------+
"""
lines = []
wrapper = TextWrapper()
if column_widths is None:
column_widths = _get_column_widths(dataset)
header_sep = '+=' + '=+='.join(['=' * w for w in column_widths]) + '=+'
row_sep = '+-' + '-+-'.join(['-' * w for w in column_widths]) + '-+'
lines.append(row_sep)
if dataset.headers:
lines.extend(_row_to_lines(
dataset.headers,
column_widths,
wrapper,
justify=JUSTIFY_CENTER,
))
lines.append(header_sep)
for row in dataset.dict:
values = iter(row.values() if hasattr(row, 'values') else row)
lines.extend(_row_to_lines(values, column_widths, wrapper))
lines.append(row_sep)
return '\n'.join(lines)
if dataset.headers:
lines.extend(cls._row_to_lines(
dataset.headers,
column_widths,
wrapper,
justify=JUSTIFY_CENTER,
))
lines.append(header_sep)
for row in dataset.dict:
values = iter(row.values() if hasattr(row, 'values') else row)
lines.extend(cls._row_to_lines(values, column_widths, wrapper))
lines.append(row_sep)
return '\n'.join(lines)
def _use_simple_table(head0, col0, width0):
"""
Use a simple table if the text in the first column is never wrapped
@classmethod
def _use_simple_table(cls, head0, col0, width0):
"""
Use a simple table if the text in the first column is never wrapped
>>> _use_simple_table('menu', ['egg', 'bacon'], 10)
True
>>> _use_simple_table(None, ['lobster thermidor', 'spam'], 10)
False
>>> from tablib.formats import registry
>>> rst = registry.get_format('rst')
>>> rst._use_simple_table('menu', ['egg', 'bacon'], 10)
True
>>> rst._use_simple_table(None, ['lobster thermidor', 'spam'], 10)
False
"""
if head0 is not None:
head0 = to_str(head0)
if len(head0) > width0:
return False
for cell in col0:
cell = to_str(cell)
if len(cell) > width0:
return False
return True
"""
if head0 is not None:
head0 = to_str(head0)
if len(head0) > width0:
return False
for cell in col0:
cell = to_str(cell)
if len(cell) > width0:
return False
return True
@classmethod
def export_set(cls, dataset, **kwargs):
"""
Returns reStructuredText table representation of dataset.
Returns a simple table if the text in the first column is never
wrapped, otherwise returns a grid table.
def export_set(dataset, **kwargs):
"""
Returns reStructuredText table representation of dataset.
>>> from tablib import Dataset
>>> bits = ((0, 0), (1, 0), (0, 1), (1, 1))
>>> data = Dataset()
>>> data.headers = ['A', 'B', 'A and B']
>>> for a, b in bits:
... data.append([bool(a), bool(b), bool(a * b)])
>>> table = data.rst
>>> table.split('\\n') == [
... '===== ===== =====',
... ' A B A and',
... ' B ',
... '===== ===== =====',
... 'False False False',
... 'True False False',
... 'False True False',
... 'True True True ',
... '===== ===== =====',
... ]
True
Returns a simple table if the text in the first column is never
wrapped, otherwise returns a grid table.
"""
if not dataset.dict:
return ''
force_grid = kwargs.get('force_grid', False)
max_table_width = kwargs.get('max_table_width', cls.MAX_TABLE_WIDTH)
column_widths = cls._get_column_widths(dataset, max_table_width)
use_simple_table = cls._use_simple_table(
dataset.headers[0] if dataset.headers else None,
dataset.get_col(0),
column_widths[0],
)
if use_simple_table and not force_grid:
return cls.export_set_as_simple_table(dataset, column_widths)
else:
return cls.export_set_as_grid_table(dataset, column_widths)
>>> from tablib import Dataset
>>> bits = ((0, 0), (1, 0), (0, 1), (1, 1))
>>> data = Dataset()
>>> data.headers = ['A', 'B', 'A and B']
>>> for a, b in bits:
... data.append([bool(a), bool(b), bool(a * b)])
>>> table = data.rst
>>> table.split('\\n') == [
... '===== ===== =====',
... ' A B A and',
... ' B ',
... '===== ===== =====',
... 'False False False',
... 'True False False',
... 'False True False',
... 'True True True ',
... '===== ===== =====',
... ]
True
@classmethod
def export_book(cls, databook):
"""
reStructuredText representation of a Databook.
"""
if not dataset.dict:
return ''
force_grid = kwargs.get('force_grid', False)
max_table_width = kwargs.get('max_table_width', MAX_TABLE_WIDTH)
column_widths = _get_column_widths(dataset, max_table_width)
use_simple_table = _use_simple_table(
dataset.headers[0] if dataset.headers else None,
dataset.get_col(0),
column_widths[0],
)
if use_simple_table and not force_grid:
return export_set_as_simple_table(dataset, column_widths)
else:
return export_set_as_grid_table(dataset, column_widths)
def export_book(databook):
"""
reStructuredText representation of a Databook.
Tables are separated by a blank line. All tables use the grid
format.
"""
return '\n\n'.join(export_set(dataset, force_grid=True)
for dataset in databook._datasets)
Tables are separated by a blank line. All tables use the grid
format.
"""
return '\n\n'.join(cls.export_set(dataset, force_grid=True)
for dataset in databook._datasets)
+5 -20
View File
@@ -1,26 +1,11 @@
""" Tablib - TSV (Tab Separated Values) Support.
"""
from tablib.formats._csv import detect as detect_wrapper
from tablib.formats._csv import export_set as export_set_wrapper
from tablib.formats._csv import import_set as import_set_wrapper
title = 'tsv'
extensions = ('tsv',)
DELIMITER = '\t'
from ._csv import CSVFormat
def export_set(dataset):
"""Returns TSV representation of Dataset."""
return export_set_wrapper(dataset, delimiter=DELIMITER)
class TSVFormat(CSVFormat):
title = 'tsv'
extensions = ('tsv',)
def import_set(dset, in_stream, headers=True):
"""Returns dataset from TSV stream."""
return import_set_wrapper(dset, in_stream, headers=headers, delimiter=DELIMITER)
def detect(stream):
"""Returns True if given stream is valid TSV."""
return detect_wrapper(stream, delimiter=DELIMITER)
DEFAULT_DELIMITER = '\t'
+107 -104
View File
@@ -7,130 +7,133 @@ import tablib
import xlrd
import xlwt
title = 'xls'
extensions = ('xls',)
# special styles
wrap = xlwt.easyxf("alignment: wrap on")
bold = xlwt.easyxf("font: bold on")
def detect(stream):
"""Returns True if given stream is a readable excel file."""
try:
xlrd.open_workbook(file_contents=stream)
return True
except Exception:
pass
try:
xlrd.open_workbook(file_contents=stream.read())
return True
except Exception:
pass
try:
xlrd.open_workbook(filename=stream)
return True
except Exception:
return False
class XLSFormat:
title = 'xls'
extensions = ('xls',)
@classmethod
def detect(cls, stream):
"""Returns True if given stream is a readable excel file."""
try:
xlrd.open_workbook(file_contents=stream)
return True
except Exception:
pass
try:
xlrd.open_workbook(file_contents=stream.read())
return True
except Exception:
pass
try:
xlrd.open_workbook(filename=stream)
return True
except Exception:
return False
@classmethod
def export_set(cls, dataset):
"""Returns XLS representation of Dataset."""
wb = xlwt.Workbook(encoding='utf8')
ws = wb.add_sheet(dataset.title if dataset.title else 'Tablib Dataset')
cls.dset_sheet(dataset, ws)
stream = BytesIO()
wb.save(stream)
return stream.getvalue()
@classmethod
def export_book(cls, databook):
"""Returns XLS representation of DataBook."""
wb = xlwt.Workbook(encoding='utf8')
for i, dset in enumerate(databook._datasets):
ws = wb.add_sheet(dset.title if dset.title else 'Sheet%s' % (i))
cls.dset_sheet(dset, ws)
stream = BytesIO()
wb.save(stream)
return stream.getvalue()
def export_set(dataset):
"""Returns XLS representation of Dataset."""
@classmethod
def import_set(cls, dset, in_stream, headers=True):
"""Returns databook from XLS stream."""
wb = xlwt.Workbook(encoding='utf8')
ws = wb.add_sheet(dataset.title if dataset.title else 'Tablib Dataset')
dset.wipe()
dset_sheet(dataset, ws)
xls_book = xlrd.open_workbook(file_contents=in_stream)
sheet = xls_book.sheet_by_index(0)
stream = BytesIO()
wb.save(stream)
return stream.getvalue()
def export_book(databook):
"""Returns XLS representation of DataBook."""
wb = xlwt.Workbook(encoding='utf8')
for i, dset in enumerate(databook._datasets):
ws = wb.add_sheet(dset.title if dset.title else 'Sheet%s' % (i))
dset_sheet(dset, ws)
stream = BytesIO()
wb.save(stream)
return stream.getvalue()
def import_set(dset, in_stream, headers=True):
"""Returns databook from XLS stream."""
dset.wipe()
xls_book = xlrd.open_workbook(file_contents=in_stream)
sheet = xls_book.sheet_by_index(0)
dset.title = sheet.name
for i in range(sheet.nrows):
if (i == 0) and (headers):
dset.headers = sheet.row_values(0)
else:
dset.append([
val if typ != xlrd.XL_CELL_ERROR else xlrd.error_text_from_code[val]
for val, typ in zip(sheet.row_values(i), sheet.row_types(i))
])
def import_book(dbook, in_stream, headers=True):
"""Returns databook from XLS stream."""
dbook.wipe()
xls_book = xlrd.open_workbook(file_contents=in_stream)
for sheet in xls_book.sheets():
data = tablib.Dataset()
data.title = sheet.name
dset.title = sheet.name
for i in range(sheet.nrows):
if (i == 0) and (headers):
data.headers = sheet.row_values(0)
if i == 0 and headers:
dset.headers = sheet.row_values(0)
else:
data.append(sheet.row_values(i))
dset.append([
val if typ != xlrd.XL_CELL_ERROR else xlrd.error_text_from_code[val]
for val, typ in zip(sheet.row_values(i), sheet.row_types(i))
])
dbook.add_sheet(data)
@classmethod
def import_book(cls, dbook, in_stream, headers=True):
"""Returns databook from XLS stream."""
dbook.wipe()
def dset_sheet(dataset, ws):
"""Completes given worksheet from given Dataset."""
_package = dataset._package(dicts=False)
xls_book = xlrd.open_workbook(file_contents=in_stream)
for i, sep in enumerate(dataset._separators):
_offset = i
_package.insert((sep[0] + _offset), (sep[1],))
for sheet in xls_book.sheets():
data = tablib.Dataset()
data.title = sheet.name
for i, row in enumerate(_package):
for j, col in enumerate(row):
for i in range(sheet.nrows):
if i == 0 and headers:
data.headers = sheet.row_values(0)
else:
data.append(sheet.row_values(i))
# bold headers
if (i == 0) and dataset.headers:
ws.write(i, j, col, bold)
dbook.add_sheet(data)
# frozen header row
ws.panes_frozen = True
ws.horz_split_pos = 1
@classmethod
def dset_sheet(cls, dataset, ws):
"""Completes given worksheet from given Dataset."""
_package = dataset._package(dicts=False)
# bold separators
elif len(row) < dataset.width:
ws.write(i, j, col, bold)
for i, sep in enumerate(dataset._separators):
_offset = i
_package.insert((sep[0] + _offset), (sep[1],))
# wrap the rest
else:
try:
if '\n' in col:
ws.write(i, j, col, wrap)
else:
for i, row in enumerate(_package):
for j, col in enumerate(row):
# bold headers
if (i == 0) and dataset.headers:
ws.write(i, j, col, bold)
# frozen header row
ws.panes_frozen = True
ws.horz_split_pos = 1
# bold separators
elif len(row) < dataset.width:
ws.write(i, j, col, bold)
# wrap the rest
else:
try:
if '\n' in col:
ws.write(i, j, col, wrap)
else:
ws.write(i, j, col)
except TypeError:
ws.write(i, j, col)
except TypeError:
ws.write(i, j, col)
+100 -100
View File
@@ -11,130 +11,130 @@ ExcelWriter = openpyxl.writer.excel.ExcelWriter
get_column_letter = openpyxl.utils.get_column_letter
title = 'xlsx'
extensions = ('xlsx',)
class XLSXFormat:
title = 'xlsx'
extensions = ('xlsx',)
@classmethod
def detect(cls, stream):
"""Returns True if given stream is a readable excel file."""
if isinstance(stream, bytes):
# load_workbook expects a file-like object.
stream = BytesIO(stream)
try:
openpyxl.reader.excel.load_workbook(stream, read_only=True)
return True
except Exception:
return False
def detect(stream):
"""Returns True if given stream is a readable excel file."""
if isinstance(stream, bytes):
# load_workbook expects a file-like object.
stream = BytesIO(stream)
try:
openpyxl.reader.excel.load_workbook(stream, read_only=True)
return True
except Exception:
return False
@classmethod
def export_set(cls, dataset, freeze_panes=True):
"""Returns XLSX representation of Dataset."""
wb = Workbook()
ws = wb.worksheets[0]
ws.title = dataset.title if dataset.title else 'Tablib Dataset'
cls.dset_sheet(dataset, ws, freeze_panes=freeze_panes)
def export_set(dataset, freeze_panes=True):
"""Returns XLSX representation of Dataset."""
stream = BytesIO()
wb.save(stream)
return stream.getvalue()
wb = Workbook()
ws = wb.worksheets[0]
ws.title = dataset.title if dataset.title else 'Tablib Dataset'
@classmethod
def export_book(cls, databook, freeze_panes=True):
"""Returns XLSX representation of DataBook."""
dset_sheet(dataset, ws, freeze_panes=freeze_panes)
wb = Workbook()
for sheet in wb.worksheets:
wb.remove(sheet)
for i, dset in enumerate(databook._datasets):
ws = wb.create_sheet()
ws.title = dset.title if dset.title else 'Sheet%s' % (i)
stream = BytesIO()
wb.save(stream)
return stream.getvalue()
cls.dset_sheet(dset, ws, freeze_panes=freeze_panes)
stream = BytesIO()
wb.save(stream)
return stream.getvalue()
def export_book(databook, freeze_panes=True):
"""Returns XLSX representation of DataBook."""
@classmethod
def import_set(cls, dset, in_stream, headers=True):
"""Returns databook from XLS stream."""
wb = Workbook()
for sheet in wb.worksheets:
wb.remove(sheet)
for i, dset in enumerate(databook._datasets):
ws = wb.create_sheet()
ws.title = dset.title if dset.title else 'Sheet%s' % (i)
dset.wipe()
dset_sheet(dset, ws, freeze_panes=freeze_panes)
xls_book = openpyxl.reader.excel.load_workbook(BytesIO(in_stream), read_only=True)
sheet = xls_book.active
stream = BytesIO()
wb.save(stream)
return stream.getvalue()
def import_set(dset, in_stream, headers=True):
"""Returns databook from XLS stream."""
dset.wipe()
xls_book = openpyxl.reader.excel.load_workbook(BytesIO(in_stream), read_only=True)
sheet = xls_book.active
dset.title = sheet.title
for i, row in enumerate(sheet.rows):
row_vals = [c.value for c in row]
if (i == 0) and (headers):
dset.headers = row_vals
else:
dset.append(row_vals)
def import_book(dbook, in_stream, headers=True):
"""Returns databook from XLS stream."""
dbook.wipe()
xls_book = openpyxl.reader.excel.load_workbook(BytesIO(in_stream), read_only=True)
for sheet in xls_book.worksheets:
data = tablib.Dataset()
data.title = sheet.title
dset.title = sheet.title
for i, row in enumerate(sheet.rows):
row_vals = [c.value for c in row]
if (i == 0) and (headers):
data.headers = row_vals
dset.headers = row_vals
else:
data.append(row_vals)
dset.append(row_vals)
dbook.add_sheet(data)
@classmethod
def import_book(cls, dbook, in_stream, headers=True):
"""Returns databook from XLS stream."""
dbook.wipe()
def dset_sheet(dataset, ws, freeze_panes=True):
"""Completes given worksheet from given Dataset."""
_package = dataset._package(dicts=False)
xls_book = openpyxl.reader.excel.load_workbook(BytesIO(in_stream), read_only=True)
for i, sep in enumerate(dataset._separators):
_offset = i
_package.insert((sep[0] + _offset), (sep[1],))
for sheet in xls_book.worksheets:
data = tablib.Dataset()
data.title = sheet.title
bold = openpyxl.styles.Font(bold=True)
wrap_text = openpyxl.styles.Alignment(wrap_text=True)
for i, row in enumerate(sheet.rows):
row_vals = [c.value for c in row]
if (i == 0) and (headers):
data.headers = row_vals
else:
data.append(row_vals)
for i, row in enumerate(_package):
row_number = i + 1
for j, col in enumerate(row):
col_idx = get_column_letter(j + 1)
cell = ws['{}{}'.format(col_idx, row_number)]
dbook.add_sheet(data)
# bold headers
if (row_number == 1) and dataset.headers:
cell.font = bold
if freeze_panes:
# Export Freeze only after first Line
ws.freeze_panes = 'A2'
@classmethod
def dset_sheet(cls, dataset, ws, freeze_panes=True):
"""Completes given worksheet from given Dataset."""
_package = dataset._package(dicts=False)
# bold separators
elif len(row) < dataset.width:
cell.font = bold
for i, sep in enumerate(dataset._separators):
_offset = i
_package.insert((sep[0] + _offset), (sep[1],))
bold = openpyxl.styles.Font(bold=True)
wrap_text = openpyxl.styles.Alignment(wrap_text=True)
for i, row in enumerate(_package):
row_number = i + 1
for j, col in enumerate(row):
col_idx = get_column_letter(j + 1)
cell = ws['{}{}'.format(col_idx, row_number)]
# bold headers
if (row_number == 1) and dataset.headers:
cell.font = bold
if freeze_panes:
# Export Freeze only after first Line
ws.freeze_panes = 'A2'
# bold separators
elif len(row) < dataset.width:
cell.font = bold
# wrap the rest
else:
try:
str_col_value = str(col)
except TypeError:
str_col_value = ''
if '\n' in str_col_value:
cell.alignment = wrap_text
# wrap the rest
else:
try:
str_col_value = str(col)
except TypeError:
str_col_value = ''
if '\n' in str_col_value:
cell.alignment = wrap_text
try:
cell.value = col
except (ValueError, TypeError):
cell.value = str(col)
cell.value = col
except (ValueError, TypeError):
cell.value = str(col)
+36 -34
View File
@@ -4,48 +4,50 @@
import tablib
import yaml
title = 'yaml'
extensions = ('yaml', 'yml')
class YAMLFormat:
title = 'yaml'
extensions = ('yaml', 'yml')
def export_set(dataset):
"""Returns YAML representation of Dataset."""
@classmethod
def export_set(cls, dataset):
"""Returns YAML representation of Dataset."""
return yaml.safe_dump(dataset._package(ordered=False))
return yaml.safe_dump(dataset._package(ordered=False))
@classmethod
def export_book(cls, databook):
"""Returns YAML representation of Databook."""
return yaml.safe_dump(databook._package(ordered=False))
def export_book(databook):
"""Returns YAML representation of Databook."""
return yaml.safe_dump(databook._package(ordered=False))
@classmethod
def import_set(cls, dset, in_stream):
"""Returns dataset from YAML stream."""
dset.wipe()
dset.dict = yaml.safe_load(in_stream)
def import_set(dset, in_stream):
"""Returns dataset from YAML stream."""
@classmethod
def import_book(cls, dbook, in_stream):
"""Returns databook from YAML stream."""
dset.wipe()
dset.dict = yaml.safe_load(in_stream)
dbook.wipe()
for sheet in yaml.safe_load(in_stream):
data = tablib.Dataset()
data.title = sheet['title']
data.dict = sheet['data']
dbook.add_sheet(data)
def import_book(dbook, in_stream):
"""Returns databook from YAML stream."""
dbook.wipe()
for sheet in yaml.safe_load(in_stream):
data = tablib.Dataset()
data.title = sheet['title']
data.dict = sheet['data']
dbook.add_sheet(data)
def detect(stream):
"""Returns True if given stream is valid YAML."""
try:
_yaml = yaml.safe_load(stream)
if isinstance(_yaml, (list, tuple, dict)):
return True
else:
@classmethod
def detect(cls, stream):
"""Returns True if given stream is valid YAML."""
try:
_yaml = yaml.safe_load(stream)
if isinstance(_yaml, (list, tuple, dict)):
return True
else:
return False
except (yaml.parser.ParserError, yaml.reader.ReaderError,
yaml.scanner.ScannerError):
return False
except (yaml.parser.ParserError, yaml.reader.ReaderError,
yaml.scanner.ScannerError):
return False
+38 -21
View File
@@ -12,8 +12,8 @@ from uuid import uuid4
import tablib
from MarkupPy import markup
from tablib.core import Row, detect_format
from tablib.formats import _csv as csv_module
from tablib.core import Row, UnsupportedFormat, detect_format
from tablib.formats import registry
class BaseTestCase(unittest.TestCase):
@@ -282,6 +282,15 @@ class TablibTestCase(BaseTestCase):
unsupported = ['csv', 'tsv', 'jira', 'latex', 'df']
self._test_export_data_in_all_formats(book, exclude=unsupported)
def test_book_unsupported_loading(self):
with self.assertRaises(UnsupportedFormat):
tablib.Databook().load('Any stream', 'csv')
def test_book_unsupported_export(self):
book = tablib.Databook().load('[{"title": "first", "data": [{"first_name": "John"}]}]', 'json')
with self.assertRaises(UnsupportedFormat):
book.export('csv')
def test_auto_format_detect(self):
"""Test auto format detection."""
# html, jira, latex, rst are export only.
@@ -618,8 +627,9 @@ class RSTTests(BaseTestCase):
data.append(self.george)
data.headers = self.headers
simple = tablib.formats._rst.export_set(data)
grid = tablib.formats._rst.export_set(data, force_grid=True)
fmt = registry.get_format('rst')
simple = fmt.export_set(data)
grid = fmt.export_set(data, force_grid=True)
self.assertNotEqual(simple, grid)
self.assertNotIn('+', simple)
self.assertIn('+', grid)
@@ -653,8 +663,9 @@ class CSVTests(BaseTestCase):
'¡¡¡¡¡¡¡¡£™∞¢£§∞§¶•¶ª∞¶•ªº••ª–º§•†•§º¶•†¥ª–º•§ƒø¥¨©πƒø†ˆ¥ç©¨√øˆ¥≈†ƒ¥ç©ø¨çˆ¥ƒçø¶'
)
self.assertTrue(tablib.formats.csv.detect(_csv))
self.assertFalse(tablib.formats.csv.detect(_bunk))
fmt = registry.get_format('csv')
self.assertTrue(fmt.detect(_csv))
self.assertFalse(fmt.detect(_bunk))
def test_csv_import_set(self):
"""Generate and import CSV set serialization."""
@@ -771,7 +782,8 @@ class CSVTests(BaseTestCase):
csv += str(col) + ','
csv = csv.strip(',') + '\r\n'
csv_stream = csv_module.export_stream_set(self.founders)
frm = registry.get_format('csv')
csv_stream = frm.export_stream_set(self.founders)
self.assertEqual(csv, csv_stream.getvalue())
def test_unicode_csv(self):
@@ -868,8 +880,9 @@ class TSVTests(BaseTestCase):
'¡¡¡¡¡¡¡¡£™∞¢£§∞§¶•¶ª∞¶•ªº••ª–º§•†•§º¶•†¥ª–º•§ƒø¥¨©πƒø†ˆ¥ç©¨√øˆ¥≈†ƒ¥ç©ø¨çˆ¥ƒçø¶'
)
self.assertTrue(tablib.formats.tsv.detect(_tsv))
self.assertFalse(tablib.formats.tsv.detect(_bunk))
fmt = registry.get_format('tsv')
self.assertTrue(fmt.detect(_tsv))
self.assertFalse(fmt.detect(_bunk))
def test_tsv_export(self):
"""Verify exporting dataset object as TSV."""
@@ -947,8 +960,9 @@ class JSONTests(BaseTestCase):
'¡¡¡¡¡¡¡¡£™∞¢£§∞§¶•¶ª∞¶•ªº••ª–º§•†•§º¶•†¥ª–º•§ƒø¥¨©πƒø†ˆ¥ç©¨√øˆ¥≈†ƒ¥ç©ø¨çˆ¥ƒçø¶'
)
self.assertTrue(tablib.formats.json.detect(_json))
self.assertFalse(tablib.formats.json.detect(_bunk))
fmt = registry.get_format('json')
self.assertTrue(fmt.detect(_json))
self.assertFalse(fmt.detect(_bunk))
def test_json_import_book(self):
"""Generate and import JSON book serialization."""
@@ -1002,12 +1016,14 @@ class YAMLTests(BaseTestCase):
_yaml = '- {age: 90, first_name: John, last_name: Adams}'
_tsv = 'foo\tbar'
_bunk = (
'¡¡¡¡¡¡---///\n\n\n¡¡£™∞¢£§∞§¶•¶ª∞¶•ªº••ª–º§•†•§º¶•†¥ª–º•§ƒø¥¨©πƒø†ˆ¥ç©¨√øˆ¥≈†ƒ¥ç©ø¨çˆ¥ƒçø¶'
'¡¡¡¡¡¡---///\n\n\n¡¡£™∞¢£§∞§¶•¶ª∞¶•ªº••ª–º§•†•§º¶•†¥ª–º•§ƒø¥¨©πƒø†'
'ˆ¥ç©¨√øˆ¥≈†ƒ¥ç©ø¨çˆ¥ƒçø¶'
)
self.assertTrue(tablib.formats.yaml.detect(_yaml))
self.assertFalse(tablib.formats.yaml.detect(_bunk))
self.assertFalse(tablib.formats.yaml.detect(_tsv))
fmt = registry.get_format('yaml')
self.assertTrue(fmt.detect(_yaml))
self.assertFalse(fmt.detect(_bunk))
self.assertFalse(fmt.detect(_tsv))
def test_yaml_import_book(self):
"""Generate and import YAML book serialization."""
@@ -1189,12 +1205,13 @@ class DBFTests(BaseTestCase):
_bunk = (
'¡¡¡¡¡¡¡¡£™∞¢£§∞§¶•¶ª∞¶•ªº••ª–º§•†•§º¶•†¥ª–º•§ƒø¥¨©πƒø†ˆ¥ç©¨√øˆ¥≈†ƒ¥ç©ø¨çˆ¥ƒçø¶'
)
self.assertTrue(tablib.formats.dbf.detect(_dbf))
self.assertFalse(tablib.formats.dbf.detect(_yaml))
self.assertFalse(tablib.formats.dbf.detect(_tsv))
self.assertFalse(tablib.formats.dbf.detect(_csv))
self.assertFalse(tablib.formats.dbf.detect(_json))
self.assertFalse(tablib.formats.dbf.detect(_bunk))
fmt = registry.get_format('dbf')
self.assertTrue(fmt.detect(_dbf))
self.assertFalse(fmt.detect(_yaml))
self.assertFalse(fmt.detect(_tsv))
self.assertFalse(fmt.detect(_csv))
self.assertFalse(fmt.detect(_json))
self.assertFalse(fmt.detect(_bunk))
class JiraTests(BaseTestCase):