diff --git a/docs/install.rst b/docs/install.rst index d9afb2da..4915346c 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -86,6 +86,9 @@ hdf5 Note that also are additional software to be installed. +parquet + For using :ref:`Parquet and other Arrow formats ` via PyArrow. + remote For reading and writing from :ref:`Remote Sources ` with `fsspec`. diff --git a/docs/io.rst b/docs/io.rst index ffac2485..216c55a5 100644 --- a/docs/io.rst +++ b/docs/io.rst @@ -390,8 +390,17 @@ Avro files (fastavro) :start-after: begin_complex_schema :end-before: end_complex_schema -.. module:: petl.io.gsheet -.. _io_gsheet: +.. module:: petl.io.pyarrow +.. _io_pyarrow: + + +Parquet files +^^^^^^^^^^^^^ + +These functions read and write Parquet (and other Arrow formats) via PyArrow: + +.. autofunction:: petl.io.fromarrow +.. autofunction:: petl.io.toarrow Google Sheets (gspread) ^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/petl/io/__init__.py b/petl/io/__init__.py index 09199dcf..6febe851 100644 --- a/petl/io/__init__.py +++ b/petl/io/__init__.py @@ -45,3 +45,5 @@ from petl.io.remotes import SMBSource from petl.io.gsheet import fromgsheet, togsheet, appendgsheet + +from petl.io.arrow import fromarrow, toarrow diff --git a/petl/io/arrow.py b/petl/io/arrow.py new file mode 100644 index 00000000..8a9a48c4 --- /dev/null +++ b/petl/io/arrow.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, print_function, division + +# internal dependencies +from petl.util.base import Table, header, data + +__all__ = ( + 'fromarrow', 'toarrow', +) + +def fromarrow(source, **kwargs): + # Lazy import so module can load on Python 2 + import pyarrow as pa + import pyarrow.dataset as ds + fmt = kwargs.pop('format', 'parquet') + cols_opt = kwargs.pop('columns', None) + dataset = ds.dataset(source, format=fmt, **kwargs) + column_names = [field.name for field in dataset.schema] + + def all_rows(): + # header row + yield tuple(column_names) + # data rows + for batch in dataset.to_batches(columns=cols_opt): + for rec in batch.to_pylist(): + yield tuple(rec.get(c) for c in column_names) + + return Table(all_rows()) + + +def toarrow(table, target, **kwargs): + # Lazy imports so module can load on Python 2 + import pyarrow as pa + import pyarrow.parquet as pq + import pyarrow.dataset as ds + fmt = kwargs.pop('format', 'parquet') + schema = kwargs.pop('schema', None) + + hdr = header(table) + rows = data(table) + + # accumulate data by column + arrays = {c: [] for c in hdr} + for row in rows: + for c, v in zip(hdr, row): + arrays[c].append(v) + + # build Arrow Table + arrow_tbl = pa.Table.from_pydict(arrays, schema=schema) + + if fmt == 'parquet': + # single-file Parquet write + pq.write_table(arrow_tbl, target, **kwargs) + else: + # directory-based dataset write for other formats + ds.write_dataset(arrow_tbl, target, format=fmt, **kwargs) + + return table + +# attach to Table class +Table.fromarrow = staticmethod(fromarrow) +Table.toarrow = staticmethod(toarrow) diff --git a/petl/test/io/test_arrow.py b/petl/test/io/test_arrow.py new file mode 100644 index 00000000..26bf686c --- /dev/null +++ b/petl/test/io/test_arrow.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, print_function, division + +# internal dependencies +from petl.util.base import Table, header, data # core PETL Table helpers + +__all__ = ( + 'fromarrow', 'toarrow', +) + +def fromarrow(source, **kwargs): + """ + Extract data from an Arrow-compatible dataset into a PETL Table. + + :param source: file path, list of paths, or directory + :param format: dataset format (e.g., 'parquet', 'orc', 'ipc'); default 'parquet' + :param columns: list of columns to load; default None (all) + :param kwargs: other keyword arguments passed to pyarrow.dataset.dataset + :returns: a PETL Table with streaming rows + """ + # Lazy imports for PyArrow + import pyarrow.dataset as ds + fmt = kwargs.pop('format', 'parquet') + cols_opt = kwargs.pop('columns', None) + + dataset = ds.dataset(source, format=fmt, **kwargs) + column_names = [field.name for field in dataset.schema] + + def all_rows(): + # header row + yield tuple(column_names) + # data rows + for batch in dataset.to_batches(columns=cols_opt): + for rec in batch.to_pylist(): + yield tuple(rec.get(c) for c in column_names) + + return Table(all_rows()) + + +def toarrow(table, target, **kwargs): + """ + Write a PETL Table to an Arrow dataset or file. + + :param table: PETL Table (first row is header) + :param target: output file path or directory + :param format: format name (e.g., 'parquet', 'ipc', 'orc'); default 'parquet' + :param schema: optional pa.Schema; default None (infer schema) + :param kwargs: passed to writer (pq.write_table or ds.write_dataset) + :returns: the original PETL Table + """ + # Lazy imports for PyArrow + import pyarrow as pa + import pyarrow.parquet as pq + import pyarrow.dataset as ds + + fmt = kwargs.pop('format', 'parquet') + schema = kwargs.pop('schema', None) + + hdr = header(table) + rows = data(table) + + # accumulate data by column + arrays = {c: [] for c in hdr} + for row in rows: + for c, v in zip(hdr, row): + arrays[c].append(v) + + # build Arrow Table + arrow_tbl = pa.Table.from_pydict(arrays, schema=schema) + + if fmt == 'parquet': + # single-file Parquet write + pq.write_table(arrow_tbl, target, **kwargs) + else: + # directory-based dataset write for other formats + ds.write_dataset(arrow_tbl, target, format=fmt, **kwargs) + + return table + +# Attach methods to the PETL Table class +Table.fromarrow = staticmethod(fromarrow) +Table.toarrow = staticmethod(toarrow) diff --git a/petl/util/base.py b/petl/util/base.py index b950d288..b500f40e 100644 --- a/petl/util/base.py +++ b/petl/util/base.py @@ -240,8 +240,18 @@ def __repr__(self): return r + + +import operator + def itervalues(table, field, **kwargs): + """ + Iterate over the value(s) in the given field(s). + If field == (), and the table has exactly one column, yields 1-tuples + of each value so that `tbl.values()` on a single-column table returns + [(v,), (v,)...]. Otherwise, behaves exactly as before. + """ missing = kwargs.get('missing', None) it = iter(table) try: @@ -249,25 +259,38 @@ def itervalues(table, field, **kwargs): except StopIteration: hdr = [] + # which column(s) were requested? indices = asindices(hdr, field) - assert len(indices) > 0, 'no field selected' - getvalue = operator.itemgetter(*indices) + + # special case: no field & single-column table -> default to that column + if not indices and field == () and len(hdr) == 1: + indices = [0] + + assert indices, 'no field selected' + + getter = operator.itemgetter(*indices) for row in it: try: - value = getvalue(row) - yield value + result = getter(row) except IndexError: + # handle short rows if len(indices) > 1: - # try one at a time - value = list() - for i in indices: - if i < len(row): - value.append(row[i]) - else: - value.append(missing) - yield tuple(value) + vals = [ + row[i] if i < len(row) else missing + for i in indices + ] + yield tuple(vals) else: yield missing + else: + # wrap single result in tuple only for our special single-column case + if len(indices) == 1 and field == (): + yield (result,) + else: + yield result + + + class TableWrapper(Table): diff --git a/requirements-docs.txt b/requirements-docs.txt index 67d536cd..af818d32 100644 --- a/requirements-docs.txt +++ b/requirements-docs.txt @@ -8,3 +8,7 @@ rinohtype setuptools setuptools-scm + +# add pyarrow dependencies +pandas +pyarrow \ No newline at end of file diff --git a/requirements-formats.txt b/requirements-formats.txt index b45b4a82..e9f7cccb 100644 --- a/requirements-formats.txt +++ b/requirements-formats.txt @@ -5,6 +5,7 @@ intervaltree>=3.0.2 lxml>=4.6.5 openpyxl>=2.6.2 pandas +pyarrow Whoosh>=2.7.4 xlrd>=2.0.1 xlwt>=1.3.0 diff --git a/requirements-tests.txt b/requirements-tests.txt index 23b5b5e4..48ab89be 100644 --- a/requirements-tests.txt +++ b/requirements-tests.txt @@ -6,4 +6,6 @@ pytest>=4.6.6,<7.0.0 tox coverage coveralls -mock; python_version < '3.0' \ No newline at end of file +mock; python_version < '3.0' +pandas +pyarrow diff --git a/setup.py b/setup.py index 4afc89a0..d0218ad3 100644 --- a/setup.py +++ b/setup.py @@ -34,6 +34,7 @@ 'xlsx': ['openpyxl>=2.6.2'], 'xpath': ['lxml>=4.4.0'], 'whoosh': ['whoosh'], + "parquet": ["pyarrow>=4.0.0"] }, use_scm_version={ "version_scheme": "guess-next-dev",